This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/csharp_dev by this push:
new 0185295 Complete simple consumer example
0185295 is described below
commit 0185295c5412e017b19e0b8b59de2c6c2e36576b
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Sep 1 17:26:55 2022 +0800
Complete simple consumer example
---
csharp/examples/Program.cs | 44 ++++++++++++-------------
csharp/rocketmq-client-csharp/Client.cs | 9 ++---
csharp/rocketmq-client-csharp/ClientManager.cs | 5 +++
csharp/rocketmq-client-csharp/IClient.cs | 2 +-
csharp/rocketmq-client-csharp/RpcClient.cs | 7 ++--
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 7 +++-
6 files changed, 44 insertions(+), 30 deletions(-)
diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index f3c6027..abc89ce 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -123,27 +123,27 @@ namespace examples
static async Task Main(string[] args)
{
var credentialsProvider = new ConfigFileCredentialsProvider();
- // var producer = new Producer(ACCESS_URL)
- // {
- // CredentialsProvider = credentialsProvider
- // };
- // producer.AddTopicOfInterest(STANDARD_TOPIC);
- // producer.AddTopicOfInterest(FIFO_TOPIC);
- // producer.AddTopicOfInterest(TIMED_TOPIC);
- // producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
- //
- // await producer.Start();
- //
- // var sendReceiptOfStandardMessage = await
SendStandardMessage(producer);
- // Console.WriteLine($"Standard message-id:
{sendReceiptOfStandardMessage.MessageId}");
- //
- // var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
- // Console.WriteLine($"FIFO message-id:
{sendReceiptOfFifoMessage.MessageId}");
- //
- // var sendReceiptOfTimedMessage = await
SendTimedMessage(producer);
- // Console.WriteLine($"Timed message-id:
{sendReceiptOfTimedMessage.MessageId}");
- //
- // await producer.Shutdown();
+ var producer = new Producer(ACCESS_URL)
+ {
+ CredentialsProvider = credentialsProvider
+ };
+ producer.AddTopicOfInterest(STANDARD_TOPIC);
+ producer.AddTopicOfInterest(FIFO_TOPIC);
+ producer.AddTopicOfInterest(TIMED_TOPIC);
+ producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
+
+ await producer.Start();
+
+ var sendReceiptOfStandardMessage = await
SendStandardMessage(producer);
+ Console.WriteLine($"Standard message-id:
{sendReceiptOfStandardMessage.MessageId}");
+
+ var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
+ Console.WriteLine($"FIFO message-id:
{sendReceiptOfFifoMessage.MessageId}");
+
+ var sendReceiptOfTimedMessage = await SendTimedMessage(producer);
+ Console.WriteLine($"Timed message-id:
{sendReceiptOfTimedMessage.MessageId}");
+
+ await producer.Shutdown();
Console.WriteLine("Now start a simple consumer");
var simpleConsumer = new SimpleConsumer(ACCESS_URL,
CONCURRENT_GROUP)
@@ -156,7 +156,7 @@ namespace examples
await ConsumeAndAckMessages(simpleConsumer);
- // await simpleConsumer.Shutdown();
+ await simpleConsumer.Shutdown();
Console.ReadKey();
}
diff --git a/csharp/rocketmq-client-csharp/Client.cs
b/csharp/rocketmq-client-csharp/Client.cs
index 675b9eb..2e6a6ec 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -449,12 +449,13 @@ namespace Org.Apache.Rocketmq
return await Manager.ChangeInvisibleDuration(target, metadata,
request, RequestTimeout);
}
- public async Task<bool> NotifyClientTermination()
+ public async Task<bool> NotifyClientTermination(rmq.Resource group)
{
List<string> endpoints = AvailableBrokerEndpoints();
- var request = new rmq::NotifyClientTerminationRequest();
-
-
+ var request = new rmq::NotifyClientTerminationRequest
+ {
+ Group = group
+ };
var metadata = new grpc.Metadata();
Signature.Sign(this, metadata);
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs
b/csharp/rocketmq-client-csharp/ClientManager.cs
index a64cdf7..c2e00e8 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -191,6 +191,11 @@ namespace Org.Apache.Rocketmq
Logger.Warn("TooManyRequest: servers
throttled");
break;
}
+ case rmq.Code.MessageNotFound:
+ {
+ Logger.Info("No message is found in the
server");
+ break;
+ }
default:
{
Logger.Warn("Unknown error status");
diff --git a/csharp/rocketmq-client-csharp/IClient.cs
b/csharp/rocketmq-client-csharp/IClient.cs
index a96e940..b1e992a 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -27,7 +27,7 @@ namespace Org.Apache.Rocketmq
Task Heartbeat();
- Task<bool> NotifyClientTermination();
+ Task<bool> NotifyClientTermination(rmq.Resource group);
void BuildClientSetting(rmq::Settings settings);
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs
b/csharp/rocketmq-client-csharp/RpcClient.cs
index dc9d753..50cd8e9 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -34,9 +34,11 @@ namespace Org.Apache.Rocketmq
protected static readonly Logger Logger =
MqLogManager.Instance.GetCurrentClassLogger();
private readonly rmq::MessagingService.MessagingServiceClient _stub;
private readonly GrpcChannel _channel;
+ private readonly string _target;
public RpcClient(string target)
{
+ _target = target;
_channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
{
HttpHandler = CreateHttpHandler()
@@ -125,15 +127,16 @@ namespace Org.Apache.Rocketmq
var deadline = DateTime.UtcNow.Add(timeout);
var callOptions = new CallOptions(metadata, deadline);
var call = _stub.ReceiveMessage(request, callOptions);
+ Logger.Debug($"ReceiveMessageRequest has been written to
{_target}");
var result = new List<rmq::ReceiveMessageResponse>();
var stream = call.ResponseStream;
while (await stream.MoveNext())
{
var entry = stream.Current;
- Logger.Debug($"Got ReceiveMessageResponse {entry}");
+ Logger.Debug($"Got ReceiveMessageResponse {entry} from
{_target}");
result.Add(entry);
}
- Logger.Debug($"Receiving of messages completed");
+ Logger.Debug($"Receiving messages from {_target} completed");
return result;
}
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index b9d83ce..60f0ba9 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -78,7 +78,12 @@ namespace Org.Apache.Rocketmq
{
_scanAssignmentCts.Cancel();
await base.Shutdown();
- if (!await NotifyClientTermination())
+ var group = new rmq.Resource()
+ {
+ Name = _group,
+ ResourceNamespace = "",
+ };
+ if (!await NotifyClientTermination(group))
{
Logger.Warn("Failed to NotifyClientTermination");
}