This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch observability in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
commit a6b1ee9e0525e004a01fa78148da8cf13f3526a5 Author: Li Zhanhui <[email protected]> AuthorDate: Tue Jun 21 17:04:44 2022 +0800 Make Shutdown async --- rocketmq-client-csharp/Client.cs | 6 +++--- rocketmq-client-csharp/IClient.cs | 2 +- rocketmq-client-csharp/IConsumer.cs | 4 +++- rocketmq-client-csharp/IProducer.cs | 2 +- rocketmq-client-csharp/Producer.cs | 4 ++-- rocketmq-client-csharp/PushConsumer.cs | 4 ++-- rocketmq-client-csharp/Session.cs | 2 +- rocketmq-client-csharp/SimpleConsumer.cs | 13 +++++++++---- tests/ProducerTest.cs | 2 +- tests/SimpleConsumerTest.cs | 4 +++- 10 files changed, 26 insertions(+), 17 deletions(-) diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs index 607daf4..28ef42b 100644 --- a/rocketmq-client-csharp/Client.cs +++ b/rocketmq-client-csharp/Client.cs @@ -75,11 +75,11 @@ namespace Org.Apache.Rocketmq }, 30, _updateTopicRouteCts.Token); } - public virtual void Shutdown() + public virtual async Task Shutdown() { Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}"); _updateTopicRouteCts.Cancel(); - Manager.Shutdown().GetAwaiter().GetResult(); + await Manager.Shutdown(); } protected string FilterBroker(Func<string, bool> acceptor) @@ -398,7 +398,7 @@ namespace Org.Apache.Rocketmq return true; } - public virtual void OnReceive(rmq::Settings settings) + public virtual void OnSettingsReceived(rmq::Settings settings) { if (null != settings.Metric) { diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs index 4b7206b..461c452 100644 --- a/rocketmq-client-csharp/IClient.cs +++ b/rocketmq-client-csharp/IClient.cs @@ -31,6 +31,6 @@ namespace Org.Apache.Rocketmq void BuildClientSetting(rmq::Settings settings); - void OnReceive(rmq::Settings settings); + void OnSettingsReceived(rmq::Settings settings); } } \ No newline at end of file diff --git a/rocketmq-client-csharp/IConsumer.cs b/rocketmq-client-csharp/IConsumer.cs index ac4d787..de27f1f 100644 --- a/rocketmq-client-csharp/IConsumer.cs +++ b/rocketmq-client-csharp/IConsumer.cs @@ -14,12 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +using System.Threading.Tasks; namespace Org.Apache.Rocketmq { public interface IConsumer { void Start(); - void Shutdown(); + Task Shutdown(); } } \ No newline at end of file diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs index 9c30c6c..088df5e 100644 --- a/rocketmq-client-csharp/IProducer.cs +++ b/rocketmq-client-csharp/IProducer.cs @@ -23,7 +23,7 @@ namespace Org.Apache.Rocketmq { void Start(); - void Shutdown(); + Task Shutdown(); Task<SendReceipt> Send(Message message); } diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs index 32606ae..4a39f33 100644 --- a/rocketmq-client-csharp/Producer.cs +++ b/rocketmq-client-csharp/Producer.cs @@ -39,10 +39,10 @@ namespace Org.Apache.Rocketmq // More initialization } - public override void Shutdown() + public override async Task Shutdown() { // Release local resources - base.Shutdown(); + await base.Shutdown(); } protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request) diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs index 909e7a2..3b37950 100644 --- a/rocketmq-client-csharp/PushConsumer.cs +++ b/rocketmq-client-csharp/PushConsumer.cs @@ -66,13 +66,13 @@ namespace Org.Apache.Rocketmq }, 10, _scanExpiredProcessQueueCTS.Token); } - public override void Shutdown() + public override async Task Shutdown() { _scanAssignmentCTS.Cancel(); _scanExpiredProcessQueueCTS.Cancel(); // Shutdown resources of derived class - base.Shutdown(); + await base.Shutdown(); } private async Task scanLoadAssignments() diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs index f5e7795..51eb09c 100644 --- a/rocketmq-client-csharp/Session.cs +++ b/rocketmq-client-csharp/Session.cs @@ -61,7 +61,7 @@ namespace Org.Apache.Rocketmq case rmq::TelemetryCommand.CommandOneofCase.Settings: { Logger.Info($"Received settings from server {cmd.Settings.ToString()}"); - _client.OnReceive(cmd.Settings); + _client.OnSettingsReceived(cmd.Settings); break; } case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand: diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs index 9eaf365..afd447a 100644 --- a/rocketmq-client-csharp/SimpleConsumer.cs +++ b/rocketmq-client-csharp/SimpleConsumer.cs @@ -17,6 +17,7 @@ using rmq = Apache.Rocketmq.V2; using NLog; +using System.Threading.Tasks; using System.Collections.Generic; using System.Collections.Concurrent; using Apache.Rocketmq.V2; @@ -57,9 +58,13 @@ namespace Org.Apache.Rocketmq base.createSession(_accessPoint.TargetUrl()); } - public override void Shutdown() + public override async Task Shutdown() { - base.Shutdown(); + await base.Shutdown(); + if (!await NotifyClientTermination()) + { + Logger.Warn("Failed to NotifyClientTermination"); + } } protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request) @@ -82,9 +87,9 @@ namespace Org.Apache.Rocketmq subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; }); } - public override void OnReceive(Settings settings) + public override void OnSettingsReceived(Settings settings) { - base.OnReceive(settings); + base.OnSettingsReceived(settings); if (settings.Subscription.Fifo) { diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs index a6746ff..baeca17 100644 --- a/tests/ProducerTest.cs +++ b/tests/ProducerTest.cs @@ -52,7 +52,7 @@ namespace Org.Apache.Rocketmq var msg = new Message(topic, body); var sendResult = await producer.Send(msg); Assert.IsNotNull(sendResult); - producer.Shutdown(); + await producer.Shutdown(); } private static string resourceNamespace = ""; diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs index 1bc1a45..29f155f 100644 --- a/tests/SimpleConsumerTest.cs +++ b/tests/SimpleConsumerTest.cs @@ -17,6 +17,7 @@ using System.Threading; using Microsoft.VisualStudio.TestTools.UnitTesting; using rmq = Apache.Rocketmq.V2; +using System.Threading.Tasks; namespace Org.Apache.Rocketmq { @@ -26,7 +27,7 @@ namespace Org.Apache.Rocketmq { [TestMethod] - public void TestStart() + public async Task TestStart() { var accessPoint = new AccessPoint(); var host = "11.166.42.94"; @@ -41,6 +42,7 @@ namespace Org.Apache.Rocketmq simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*"); simpleConsumer.Start(); Thread.Sleep(10_000); + await simpleConsumer.Shutdown(); } }
