This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 9da25a0 Complete SimpleConsumer example (#222)
9da25a0 is described below
commit 9da25a0f86ad426b75abece4d457357f2becd045
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Sep 1 18:53:20 2022 +0800
Complete SimpleConsumer example (#222)
* Clean up code
* WIP: prepare to fix
* WIP: bugfix
* Complete simple consumer example
---
csharp/examples/Program.cs | 70 +++++--
csharp/rocketmq-client-csharp/AccessPoint.cs | 28 +--
csharp/rocketmq-client-csharp/Client.cs | 265 ++++++++++++++----------
csharp/rocketmq-client-csharp/ClientConfig.cs | 24 +--
csharp/rocketmq-client-csharp/ClientManager.cs | 15 +-
csharp/rocketmq-client-csharp/IClient.cs | 6 +-
csharp/rocketmq-client-csharp/IClientConfig.cs | 4 -
csharp/rocketmq-client-csharp/Producer.cs | 4 +-
csharp/rocketmq-client-csharp/RpcClient.cs | 9 +-
csharp/rocketmq-client-csharp/Session.cs | 48 ++---
csharp/rocketmq-client-csharp/Signature.cs | 10 +-
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 232 ++++++++++++---------
csharp/tests/ClientManagerTest.cs | 2 +-
csharp/tests/ProducerTest.cs | 138 ++++++------
csharp/tests/RpcClientTest.cs | 6 +-
csharp/tests/SignatureTest.cs | 11 +-
csharp/tests/SimpleConsumerTest.cs | 20 +-
17 files changed, 498 insertions(+), 394 deletions(-)
diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index e0d3851..abc89ce 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -23,11 +23,13 @@ namespace examples
{
class Program
{
- private const string accessUrl =
"rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
- private const string standardTopic = "sdk_standard";
- private const string fifoTopic = "sdk_fifo";
- private const string timedTopic = "sdk_timed";
- private const string transactionalTopic = "sdk_transactional";
+ private const string ACCESS_URL =
"rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
+ private const string STANDARD_TOPIC = "sdk_standard";
+ private const string FIFO_TOPIC = "sdk_fifo";
+ private const string TIMED_TOPIC = "sdk_timed";
+ private const string TRANSACTIONAL_TOPIC = "sdk_transactional";
+
+ private const string CONCURRENT_GROUP = "sdk_concurrency";
private static async Task<SendReceipt> SendStandardMessage(Producer
producer)
{
@@ -40,7 +42,7 @@ namespace examples
"k2"
};
- var msg = new Message(standardTopic, body)
+ var msg = new Message(STANDARD_TOPIC, body)
{
// Tag the massage. A message has at most one tag.
Tag = "Tag-0",
@@ -63,7 +65,7 @@ namespace examples
"k2"
};
- var msg = new Message(fifoTopic, body)
+ var msg = new Message(FIFO_TOPIC, body)
{
// Tag the massage. A message has at most one tag.
Tag = "Tag-0",
@@ -89,7 +91,7 @@ namespace examples
"k2"
};
- var msg = new Message(timedTopic, body)
+ var msg = new Message(TIMED_TOPIC, body)
{
// Tag the massage. A message has at most one tag.
Tag = "Tag-0",
@@ -100,27 +102,61 @@ namespace examples
msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30);
return await producer.Send(msg);
}
+
+ private static async Task ConsumeAndAckMessages(SimpleConsumer
simpleConsumer)
+ {
+ var messages = await simpleConsumer.Receive(32,
TimeSpan.FromSeconds(60));
+ if (null != messages)
+ {
+ var tasks = new List<Task>();
+ foreach (var message in messages)
+ {
+ Console.WriteLine($"Receive a message,
topic={message.Topic}, message-id={message.MessageId}");
+ var task = simpleConsumer.Ack(message);
+ tasks.Add(task);
+ }
+ await Task.WhenAll(tasks);
+ Console.WriteLine($"{tasks.Count} messages have been
acknowledged");
+ }
+ }
static async Task Main(string[] args)
{
var credentialsProvider = new ConfigFileCredentialsProvider();
- var producer = new Producer(accessUrl);
- producer.CredentialsProvider = credentialsProvider;
- producer.AddTopicOfInterest(standardTopic);
- producer.AddTopicOfInterest(fifoTopic);
- producer.AddTopicOfInterest(timedTopic);
- producer.AddTopicOfInterest(transactionalTopic);
-
+ var producer = new Producer(ACCESS_URL)
+ {
+ CredentialsProvider = credentialsProvider
+ };
+ producer.AddTopicOfInterest(STANDARD_TOPIC);
+ producer.AddTopicOfInterest(FIFO_TOPIC);
+ producer.AddTopicOfInterest(TIMED_TOPIC);
+ producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
+
await producer.Start();
-
+
var sendReceiptOfStandardMessage = await
SendStandardMessage(producer);
Console.WriteLine($"Standard message-id:
{sendReceiptOfStandardMessage.MessageId}");
-
+
var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
Console.WriteLine($"FIFO message-id:
{sendReceiptOfFifoMessage.MessageId}");
var sendReceiptOfTimedMessage = await SendTimedMessage(producer);
Console.WriteLine($"Timed message-id:
{sendReceiptOfTimedMessage.MessageId}");
+
+ await producer.Shutdown();
+
+ Console.WriteLine("Now start a simple consumer");
+ var simpleConsumer = new SimpleConsumer(ACCESS_URL,
CONCURRENT_GROUP)
+ {
+ CredentialsProvider = credentialsProvider
+ };
+
+ simpleConsumer.Subscribe(STANDARD_TOPIC, new FilterExpression("*",
ExpressionType.TAG));
+ await simpleConsumer.Start();
+
+ await ConsumeAndAckMessages(simpleConsumer);
+
+ await simpleConsumer.Shutdown();
Console.ReadKey();
}
diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs
b/csharp/rocketmq-client-csharp/AccessPoint.cs
index ab29273..f05fa29 100644
--- a/csharp/rocketmq-client-csharp/AccessPoint.cs
+++ b/csharp/rocketmq-client-csharp/AccessPoint.cs
@@ -37,35 +37,27 @@ namespace Org.Apache.Rocketmq
throw new ArgumentException("Access url should be of format
host:port");
}
- _host = segments[0];
- _port = Int32.Parse(segments[1]);
+ Host = segments[0];
+ Port = Int32.Parse(segments[1]);
}
-
- private string _host;
- public string Host
- {
- get { return _host; }
- set { _host = value; }
- }
+ public string Host { get; }
- private int _port;
+ public int Port { get; set; }
- public int Port
+ public string TargetUrl()
{
- get { return _port; }
- set { _port = value; }
+ return $"https://{Host}:{Port}";
}
- public string TargetUrl()
+ public rmq::AddressScheme HostScheme()
{
- return $"https://{_host}:{_port}";
+ return SchemeOf(Host);
}
- public rmq::AddressScheme HostScheme()
+ private static rmq::AddressScheme SchemeOf(string host)
{
- IPAddress ip;
- bool result = IPAddress.TryParse(_host, out ip);
+ var result = IPAddress.TryParse(host, out var ip);
if (!result)
{
return rmq::AddressScheme.DomainName;
diff --git a/csharp/rocketmq-client-csharp/Client.cs
b/csharp/rocketmq-client-csharp/Client.cs
index 0fe4ec9..2e6a6ec 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -22,7 +22,7 @@ using System.Threading;
using System.Diagnostics;
using System;
using rmq = Apache.Rocketmq.V2;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using NLog;
using System.Diagnostics.Metrics;
@@ -37,34 +37,39 @@ namespace Org.Apache.Rocketmq
AccessPoint = new AccessPoint(accessUrl);
AccessPointScheme = AccessPoint.HostScheme();
- var serviceEndpoint = new rmq::Address();
- serviceEndpoint.Host = AccessPoint.Host;
- serviceEndpoint.Port = AccessPoint.Port;
+ var serviceEndpoint = new rmq::Address
+ {
+ Host = AccessPoint.Host,
+ Port = AccessPoint.Port
+ };
AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
_resourceNamespace = "";
- ClientSettings = new rmq::Settings();
+ ClientSettings = new rmq::Settings
+ {
+ AccessPoint = new rmq::Endpoints
+ {
+ Scheme = AccessPoint.HostScheme()
+ }
+ };
- ClientSettings.AccessPoint = new rmq::Endpoints();
- ClientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
ClientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
ClientSettings.RequestTimeout =
Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
- ClientSettings.UserAgent = new rmq.UA();
- ClientSettings.UserAgent.Language = rmq::Language.DotNet;
- ClientSettings.UserAgent.Version = "5.0.0";
- ClientSettings.UserAgent.Platform =
Environment.OSVersion.ToString();
- ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+ ClientSettings.UserAgent = new rmq.UA
+ {
+ Language = rmq::Language.DotNet,
+ Version = "5.0.0",
+ Platform = Environment.OSVersion.ToString(),
+ Hostname = System.Net.Dns.GetHostName()
+ };
- _manager = new ClientManager();
+ Manager = new ClientManager();
_topicRouteTable = new ConcurrentDictionary<string,
TopicRouteData>();
_updateTopicRouteCts = new CancellationTokenSource();
-
- _healthCheckCts = new CancellationTokenSource();
-
_telemetryCts = new CancellationTokenSource();
}
@@ -72,36 +77,37 @@ namespace Org.Apache.Rocketmq
{
Schedule(async () =>
{
+ Logger.Debug("Update topic route by schedule");
await UpdateTopicRoute();
}, 30, _updateTopicRouteCts.Token);
// Get routes for topics of interest.
+ Logger.Debug("Step of #Start: get route for topics of interest");
await UpdateTopicRoute();
string accessPointUrl = AccessPoint.TargetUrl();
CreateSession(accessPointUrl);
-
await
_sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
-
+ Logger.Debug($"Session has been created for {accessPointUrl}");
await Heartbeat();
}
public virtual async Task Shutdown()
{
- Logger.Info($"Shutdown
client[resource-namespace={_resourceNamespace}");
+ Logger.Info($"Shutdown client");
_updateTopicRouteCts.Cancel();
_telemetryCts.Cancel();
- await _manager.Shutdown();
+ await Manager.Shutdown();
}
- protected string FilterBroker(Func<string, bool> acceptor)
+ private string FilterBroker(Func<string, bool> acceptor)
{
foreach (var item in _topicRouteTable)
{
foreach (var partition in item.Value.MessageQueues)
{
- string target = Utilities.TargetUrl(partition);
+ var target = Utilities.TargetUrl(partition);
if (acceptor(target))
{
return target;
@@ -116,7 +122,7 @@ namespace Org.Apache.Rocketmq
*/
private List<string> AvailableBrokerEndpoints()
{
- List<string> endpoints = new List<string>();
+ var endpoints = new List<string>();
foreach (var item in _topicRouteTable)
{
foreach (var partition in item.Value.MessageQueues)
@@ -149,7 +155,7 @@ namespace Org.Apache.Rocketmq
List<string> topicList = new List<string>();
topicList.AddRange(topics);
- List<Task<TopicRouteData>> tasks = new
List<Task<TopicRouteData>>();
+ var tasks = new List<Task<TopicRouteData>>();
foreach (var item in topicList)
{
tasks.Add(GetRouteFor(item, true));
@@ -188,7 +194,7 @@ namespace Org.Apache.Rocketmq
}
}
- public void Schedule(Action action, int seconds, CancellationToken
token)
+ protected void Schedule(Action action, int seconds, CancellationToken
token)
{
if (null == action)
{
@@ -213,46 +219,51 @@ namespace Org.Apache.Rocketmq
* direct
* Indicate if we should by-pass cache and fetch route entries from
name server.
*/
- public async Task<TopicRouteData> GetRouteFor(string topic, bool
direct)
+ protected async Task<TopicRouteData> GetRouteFor(string topic, bool
direct)
{
+ Logger.Debug($"Get route for topic={topic}, direct={direct}");
if (!direct && _topicRouteTable.ContainsKey(topic))
{
+ Logger.Debug($"Return cached route for {topic}");
return _topicRouteTable[topic];
}
// We got one or more name servers available.
- var request = new rmq::QueryRouteRequest();
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = _resourceNamespace;
- request.Topic.Name = topic;
- request.Endpoints = new rmq::Endpoints();
- request.Endpoints.Scheme = AccessPointScheme;
+ var request = new rmq::QueryRouteRequest
+ {
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = topic
+ },
+ Endpoints = new rmq::Endpoints
+ {
+ Scheme = AccessPointScheme
+ }
+ };
foreach (var address in AccessPointEndpoints)
{
request.Endpoints.Addresses.Add(address);
}
var metadata = new grpc.Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
int index = _random.Next(0, AccessPointEndpoints.Count);
var serviceEndpoint = AccessPointEndpoints[index];
// AccessPointAddresses.Count
string target =
$"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
- TopicRouteData topicRouteData;
try
{
Logger.Debug($"Resolving route for topic={topic}");
- topicRouteData = await _manager.ResolveRoute(target, metadata,
request, RequestTimeout);
+ var topicRouteData = await Manager.ResolveRoute(target,
metadata, request, RequestTimeout);
if (null != topicRouteData)
{
Logger.Debug($"Got route entries for {topic} from name
server");
_topicRouteTable.TryAdd(topic, topicRouteData);
+ Logger.Debug($"Got route for {topic} from {target}");
return topicRouteData;
}
- else
- {
- Logger.Warn($"Failed to query route of {topic} from
{target}");
- }
+ Logger.Warn($"Failed to query route of {topic} from {target}");
}
catch (Exception e)
{
@@ -273,16 +284,20 @@ namespace Org.Apache.Rocketmq
return;
}
- var request = new rmq::HeartbeatRequest();
+ var request = new rmq::HeartbeatRequest
+ {
+ Group = null,
+ ClientType = rmq.ClientType.Unspecified
+ };
PrepareHeartbeatData(request);
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
List<Task> tasks = new List<Task>();
foreach (var endpoint in endpoints)
{
- tasks.Add(_manager.Heartbeat(endpoint, metadata, request,
RequestTimeout));
+ tasks.Add(Manager.Heartbeat(endpoint, metadata, request,
RequestTimeout));
}
await Task.WhenAll(tasks);
@@ -303,15 +318,24 @@ namespace Org.Apache.Rocketmq
{
// Pick a broker randomly
string target = FilterBroker((s) => true);
- var request = new rmq::QueryAssignmentRequest();
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = _resourceNamespace;
- request.Topic.Name = topic;
- request.Group = new rmq::Resource();
- request.Group.ResourceNamespace = _resourceNamespace;
- request.Group.Name = group;
- request.Endpoints = new rmq::Endpoints();
- request.Endpoints.Scheme = AccessPointScheme;
+ var request = new rmq::QueryAssignmentRequest
+ {
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = topic
+ },
+ Group = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = group
+ },
+ Endpoints = new rmq::Endpoints
+ {
+ Scheme = AccessPointScheme
+ }
+ };
+
foreach (var endpoint in AccessPointEndpoints)
{
request.Endpoints.Addresses.Add(endpoint);
@@ -319,8 +343,8 @@ namespace Org.Apache.Rocketmq
try
{
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
- return await _manager.QueryLoadAssignment(target, metadata,
request, RequestTimeout);
+ Signature.Sign(this, metadata);
+ return await Manager.QueryLoadAssignment(target, metadata,
request, RequestTimeout);
}
catch (System.Exception e)
{
@@ -344,88 +368,102 @@ namespace Org.Apache.Rocketmq
settings.MergeFrom(ClientSettings);
}
- public void CreateSession(string url)
+ private async Task CreateSession(string url)
{
+ Logger.Debug($"Create session for url={url}");
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
- var stream = _manager.Telemetry(url, metadata);
+ Signature.Sign(this, metadata);
+ var stream = Manager.Telemetry(url, metadata);
var session = new Session(url, stream, this);
_sessions.TryAdd(url, session);
- Task.Run(async () =>
- {
- await session.Loop();
- });
+ await session.Loop();
}
-
- public async Task<List<Message>> ReceiveMessage(rmq::Assignment
assignment, string group)
+ internal async Task<List<Message>> ReceiveMessage(rmq::Assignment
assignment, string group)
{
var targetUrl = TargetUrl(assignment);
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
- var request = new rmq::ReceiveMessageRequest();
- request.Group = new rmq::Resource();
- request.Group.ResourceNamespace = _resourceNamespace;
- request.Group.Name = group;
- request.MessageQueue = assignment.MessageQueue;
- var messages = await _manager.ReceiveMessage(targetUrl, metadata,
request, getLongPollingTimeout());
+ Signature.Sign(this, metadata);
+ var request = new rmq::ReceiveMessageRequest
+ {
+ Group = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = group
+ },
+ MessageQueue = assignment.MessageQueue
+ };
+ var messages = await Manager.ReceiveMessage(targetUrl, metadata,
request,
+ ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan());
return messages;
}
public async Task<Boolean> Ack(string target, string group, string
topic, string receiptHandle, String messageId)
{
- var request = new rmq::AckMessageRequest();
- request.Group = new rmq::Resource();
- request.Group.ResourceNamespace = _resourceNamespace;
- request.Group.Name = group;
-
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = _resourceNamespace;
- request.Topic.Name = topic;
-
- var entry = new rmq::AckMessageEntry();
- entry.ReceiptHandle = receiptHandle;
- entry.MessageId = messageId;
+ var request = new rmq::AckMessageRequest
+ {
+ Group = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = group
+ },
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = topic
+ }
+ };
+
+ var entry = new rmq::AckMessageEntry
+ {
+ ReceiptHandle = receiptHandle,
+ MessageId = messageId
+ };
request.Entries.Add(entry);
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
- return await _manager.Ack(target, metadata, request,
RequestTimeout);
+ Signature.Sign(this, metadata);
+ return await Manager.Ack(target, metadata, request,
RequestTimeout);
}
public async Task<Boolean> ChangeInvisibleDuration(string target,
string group, string topic, string receiptHandle, String messageId)
{
- var request = new rmq::ChangeInvisibleDurationRequest();
- request.ReceiptHandle = receiptHandle;
- request.Group = new rmq::Resource();
- request.Group.ResourceNamespace = _resourceNamespace;
- request.Group.Name = group;
-
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = _resourceNamespace;
- request.Topic.Name = topic;
-
- request.MessageId = messageId;
+ var request = new rmq::ChangeInvisibleDurationRequest
+ {
+ ReceiptHandle = receiptHandle,
+ Group = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = group
+ },
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = topic
+ },
+ MessageId = messageId
+ };
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
- return await _manager.ChangeInvisibleDuration(target, metadata,
request, RequestTimeout);
+ Signature.Sign(this, metadata);
+ return await Manager.ChangeInvisibleDuration(target, metadata,
request, RequestTimeout);
}
- public async Task<bool> NotifyClientTermination()
+ public async Task<bool> NotifyClientTermination(rmq.Resource group)
{
List<string> endpoints = AvailableBrokerEndpoints();
- var request = new rmq::NotifyClientTerminationRequest();
-
-
+ var request = new rmq::NotifyClientTerminationRequest
+ {
+ Group = group
+ };
var metadata = new grpc.Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
List<Task<Boolean>> tasks = new List<Task<Boolean>>();
foreach (var endpoint in endpoints)
{
- tasks.Add(_manager.NotifyClientTermination(endpoint, metadata,
request, RequestTimeout));
+ tasks.Add(Manager.NotifyClientTermination(endpoint, metadata,
request, RequestTimeout));
}
bool[] results = await Task.WhenAll(tasks);
@@ -439,7 +477,7 @@ namespace Org.Apache.Rocketmq
return true;
}
- public virtual void OnSettingsReceived(rmq::Settings settings)
+ internal virtual void OnSettingsReceived(rmq::Settings settings)
{
if (null != settings.Metric)
{
@@ -452,9 +490,24 @@ namespace Org.Apache.Rocketmq
ClientSettings.BackoffPolicy = new rmq::RetryPolicy();
ClientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
}
+
+ switch (settings.PubSubCase)
+ {
+ case rmq.Settings.PubSubOneofCase.Publishing:
+ {
+ ClientSettings.Publishing = settings.Publishing;
+ break;
+ }
+
+ case rmq.Settings.PubSubOneofCase.Subscription:
+ {
+ ClientSettings.Subscription = settings.Subscription;
+ break;
+ }
+ }
}
- protected readonly IClientManager _manager;
+ protected readonly IClientManager Manager;
private readonly HashSet<string> _topicsOfInterest = new
HashSet<string>();
@@ -465,9 +518,7 @@ namespace Org.Apache.Rocketmq
private readonly ConcurrentDictionary<string, TopicRouteData>
_topicRouteTable;
private readonly CancellationTokenSource _updateTopicRouteCts;
-
- private readonly CancellationTokenSource _healthCheckCts;
-
+
private readonly CancellationTokenSource _telemetryCts;
public CancellationTokenSource TelemetryCts()
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs
b/csharp/rocketmq-client-csharp/ClientConfig.cs
index 0d99cb1..86175a2 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/ClientConfig.cs
@@ -75,15 +75,6 @@ namespace Org.Apache.Rocketmq
set { credentialsProvider_ = value; }
}
- public string tenantId()
- {
- return _tenantId;
- }
- public string TenantId
- {
- set { _tenantId = value; }
- }
-
public TimeSpan RequestTimeout
{
get
@@ -96,15 +87,6 @@ namespace Org.Apache.Rocketmq
}
}
- public TimeSpan getLongPollingTimeout()
- {
- return longPollingIoTimeout_;
- }
- public TimeSpan LongPollingTimeout
- {
- set { longPollingIoTimeout_ = value; }
- }
-
public string getGroupName()
{
return groupName_;
@@ -139,9 +121,7 @@ namespace Org.Apache.Rocketmq
protected string _resourceNamespace;
private ICredentialsProvider credentialsProvider_;
-
- private string _tenantId;
-
+
private TimeSpan _requestTimeout;
private TimeSpan longPollingIoTimeout_;
@@ -150,7 +130,7 @@ namespace Org.Apache.Rocketmq
private string clientId_;
- private bool tracingEnabled_ = false;
+ private bool tracingEnabled_;
private string instanceName_ = "default";
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs
b/csharp/rocketmq-client-csharp/ClientManager.cs
index a39a0e0..c2e00e8 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -191,6 +191,11 @@ namespace Org.Apache.Rocketmq
Logger.Warn("TooManyRequest: servers
throttled");
break;
}
+ case rmq.Code.MessageNotFound:
+ {
+ Logger.Info("No message is found in the
server");
+ break;
+ }
default:
{
Logger.Warn("Unknown error status");
@@ -221,10 +226,12 @@ namespace Org.Apache.Rocketmq
private Message Convert(string sourceHost, rmq::Message message)
{
- var msg = new Message();
- msg.Topic = message.Topic.Name;
- msg.MessageId = message.SystemProperties.MessageId;
- msg.Tag = message.SystemProperties.Tag;
+ var msg = new Message
+ {
+ Topic = message.Topic.Name,
+ MessageId = message.SystemProperties.MessageId,
+ Tag = message.SystemProperties.Tag
+ };
// Validate message body checksum
byte[] raw = message.Body.ToByteArray();
diff --git a/csharp/rocketmq-client-csharp/IClient.cs
b/csharp/rocketmq-client-csharp/IClient.cs
index 3352028..b1e992a 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -27,12 +27,10 @@ namespace Org.Apache.Rocketmq
Task Heartbeat();
- Task<bool> NotifyClientTermination();
+ Task<bool> NotifyClientTermination(rmq.Resource group);
void BuildClientSetting(rmq::Settings settings);
-
-
- void OnSettingsReceived(rmq::Settings settings);
+
CancellationTokenSource TelemetryCts();
}
diff --git a/csharp/rocketmq-client-csharp/IClientConfig.cs
b/csharp/rocketmq-client-csharp/IClientConfig.cs
index 438d7a8..57325b4 100644
--- a/csharp/rocketmq-client-csharp/IClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/IClientConfig.cs
@@ -28,10 +28,6 @@ namespace Org.Apache.Rocketmq
ICredentialsProvider credentialsProvider();
- string tenantId();
-
- TimeSpan getLongPollingTimeout();
-
string getGroupName();
string clientId();
diff --git a/csharp/rocketmq-client-csharp/Producer.cs
b/csharp/rocketmq-client-csharp/Producer.cs
index e337b1a..7d55904 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -161,7 +161,7 @@ namespace Org.Apache.Rocketmq
}
var metadata = new Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
Exception ex = null;
@@ -171,7 +171,7 @@ namespace Org.Apache.Rocketmq
{
var stopWatch = new Stopwatch();
stopWatch.Start();
- rmq::SendMessageResponse response = await
_manager.SendMessage(target, metadata, request, RequestTimeout);
+ rmq::SendMessageResponse response = await
Manager.SendMessage(target, metadata, request, RequestTimeout);
if (null != response && rmq::Code.Ok ==
response.Status.Code)
{
var messageId = response.Entries[0].MessageId;
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs
b/csharp/rocketmq-client-csharp/RpcClient.cs
index c1f1cd6..50cd8e9 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -34,9 +34,11 @@ namespace Org.Apache.Rocketmq
protected static readonly Logger Logger =
MqLogManager.Instance.GetCurrentClassLogger();
private readonly rmq::MessagingService.MessagingServiceClient _stub;
private readonly GrpcChannel _channel;
+ private readonly string _target;
public RpcClient(string target)
{
+ _target = target;
_channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
{
HttpHandler = CreateHttpHandler()
@@ -76,7 +78,7 @@ namespace Org.Apache.Rocketmq
public AsyncDuplexStreamingCall<rmq::TelemetryCommand,
rmq::TelemetryCommand> Telemetry(Metadata metadata)
{
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+ var deadline = DateTime.UtcNow.Add(TimeSpan.FromDays(3650));
var callOptions = new CallOptions(metadata, deadline);
return _stub.Telemetry(callOptions);
}
@@ -125,15 +127,16 @@ namespace Org.Apache.Rocketmq
var deadline = DateTime.UtcNow.Add(timeout);
var callOptions = new CallOptions(metadata, deadline);
var call = _stub.ReceiveMessage(request, callOptions);
+ Logger.Debug($"ReceiveMessageRequest has been written to
{_target}");
var result = new List<rmq::ReceiveMessageResponse>();
var stream = call.ResponseStream;
while (await stream.MoveNext())
{
var entry = stream.Current;
- Logger.Debug($"Got ReceiveMessageResponse {entry}");
+ Logger.Debug($"Got ReceiveMessageResponse {entry} from
{_target}");
result.Add(entry);
}
- Logger.Debug($"Receiving of messages completed");
+ Logger.Debug($"Receiving messages from {_target} completed");
return result;
}
diff --git a/csharp/rocketmq-client-csharp/Session.cs
b/csharp/rocketmq-client-csharp/Session.cs
index a6be057..4d09894 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -18,7 +18,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using NLog;
using rmq = Apache.Rocketmq.V2;
@@ -28,31 +28,33 @@ namespace Org.Apache.Rocketmq
{
private static readonly Logger Logger =
MqLogManager.Instance.GetCurrentClassLogger();
- public Session(string target,
+ public Session(string target,
grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand,
rmq::TelemetryCommand> stream,
- IClient client)
+ Client client)
{
- this._target = target;
- this._stream = stream;
- this._client = client;
- this._channel = Channel.CreateUnbounded<bool>();
+ _target = target;
+ _stream = stream;
+ _client = client;
+ _channel = Channel.CreateUnbounded<bool>();
}
public async Task Loop()
{
- var reader = this._stream.ResponseStream;
- var writer = this._stream.RequestStream;
- var request = new rmq::TelemetryCommand();
- request.Settings = new rmq::Settings();
+ var reader = _stream.ResponseStream;
+ var writer = _stream.RequestStream;
+ var request = new rmq::TelemetryCommand
+ {
+ Settings = new rmq::Settings()
+ };
_client.BuildClientSetting(request.Settings);
await writer.WriteAsync(request);
- Logger.Debug($"Writing Client Settings Done:
{request.Settings.ToString()}");
+ Logger.Debug($"Writing Client Settings to {_target} Done:
{request.Settings}");
while (!_client.TelemetryCts().IsCancellationRequested)
{
if (await reader.MoveNext(_client.TelemetryCts().Token))
{
var cmd = reader.Current;
- Logger.Debug($"Received a TelemetryCommand:
{cmd.ToString()}");
+ Logger.Debug($"Received a TelemetryCommand from {_target}:
{cmd}");
switch (cmd.CommandCase)
{
case rmq::TelemetryCommand.CommandOneofCase.None:
@@ -71,7 +73,7 @@ namespace Org.Apache.Rocketmq
await _channel.Writer.WriteAsync(true);
}
- Logger.Info($"Received settings from server
{cmd.Settings.ToString()}");
+ Logger.Info($"Received settings from
{_target}: {cmd.Settings}");
_client.OnSettingsReceived(cmd.Settings);
break;
}
@@ -90,17 +92,10 @@ namespace Org.Apache.Rocketmq
}
}
}
- Logger.Info("Telemetry stream cancelled");
+ Logger.Info($"Telemetry stream for {_target} is cancelled");
await writer.CompleteAsync();
}
- private string _target;
-
- public string Target
- {
- get { return _target; }
- }
-
public async Task AwaitSettingNegotiationCompletion()
{
if (0 != Interlocked.Read(ref _established))
@@ -112,11 +107,12 @@ namespace Org.Apache.Rocketmq
await _channel.Reader.ReadAsync();
}
- private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand,
rmq::TelemetryCommand> _stream;
- private IClient _client;
+ private readonly grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand,
rmq::TelemetryCommand> _stream;
+ private readonly Client _client;
- private long _established = 0;
+ private long _established;
- private Channel<bool> _channel;
+ private readonly Channel<bool> _channel;
+ private readonly string _target;
};
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Signature.cs
b/csharp/rocketmq-client-csharp/Signature.cs
index 2331b53..ec78171 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -16,22 +16,18 @@
*/
using System;
using System.Text;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using System.Security.Cryptography;
namespace Org.Apache.Rocketmq
{
- public class Signature
+ public static class Signature
{
- public static void sign(IClientConfig clientConfig, grpc::Metadata
metadata)
+ public static void Sign(IClientConfig clientConfig, grpc::Metadata
metadata)
{
metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
metadata.Add(MetadataConstants.CLIENT_ID_KEY,
clientConfig.clientId());
- if (!String.IsNullOrEmpty(clientConfig.tenantId()))
- {
- metadata.Add(MetadataConstants.TENANT_ID_KEY,
clientConfig.tenantId());
- }
if (!String.IsNullOrEmpty(clientConfig.resourceNamespace()))
{
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index d4694ac..60f0ba9 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -33,7 +33,6 @@ namespace Org.Apache.Rocketmq
public SimpleConsumer(string accessUrl, string group)
: base(accessUrl)
{
- _fifo = false;
_subscriptions = new ConcurrentDictionary<string,
rmq.SubscriptionEntry>();
_topicAssignments = new ConcurrentDictionary<string,
List<rmq.Assignment>>();
_group = group;
@@ -44,15 +43,20 @@ namespace Org.Apache.Rocketmq
base.BuildClientSetting(settings);
settings.ClientType = rmq::ClientType.SimpleConsumer;
- settings.Subscription = new rmq::Subscription();
- settings.Subscription.Group = new rmq::Resource();
- settings.Subscription.Group.Name = _group;
- settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
+ settings.Subscription = new rmq::Subscription
+ {
+ Group = new rmq::Resource
+ {
+ Name = _group,
+ ResourceNamespace = ResourceNamespace
+ }
+ };
foreach (var kv in _subscriptions)
{
settings.Subscription.Subscriptions.Add(kv.Value);
}
+ Logger.Info($"ClientSettings built OK. {settings}");
}
public override async Task Start()
@@ -62,54 +66,70 @@ namespace Org.Apache.Rocketmq
// Scan load assignment periodically
Schedule(async () =>
{
- while (!_scanAssignmentCts.IsCancellationRequested)
- {
- await ScanLoadAssignments();
- }
+ Logger.Debug("Scan load assignments by schedule");
+ await ScanLoadAssignments();
}, 30, _scanAssignmentCts.Token);
await ScanLoadAssignments();
+ Logger.Debug("Step of #Start: ScanLoadAssignments completed");
}
public override async Task Shutdown()
{
+ _scanAssignmentCts.Cancel();
await base.Shutdown();
- if (!await NotifyClientTermination())
+ var group = new rmq.Resource()
+ {
+ Name = _group,
+ ResourceNamespace = "",
+ };
+ if (!await NotifyClientTermination(group))
{
Logger.Warn("Failed to NotifyClientTermination");
}
}
-
+
+ /**
+ * For 5.x, we can assume there is a load-balancer before gateway
nodes.
+ */
private async Task ScanLoadAssignments()
{
-
- List<Task<List<rmq::Assignment>>> tasks = new
List<Task<List<rmq.Assignment>>>();
- List<string> topics = new List<string>();
+ var tasks = new List<Task<List<rmq.Assignment>>>();
+ var topics = new List<string>();
foreach (var sub in _subscriptions)
{
- var request = new rmq::QueryAssignmentRequest();
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = ResourceNamespace;
- request.Topic.Name = sub.Key;
+ var request = new rmq::QueryAssignmentRequest
+ {
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = sub.Key
+ }
+ };
topics.Add(sub.Key);
- request.Group = new rmq::Resource();
- request.Group.Name = _group;
- request.Group.ResourceNamespace = ResourceNamespace;
+ request.Group = new rmq::Resource
+ {
+ Name = _group,
+ ResourceNamespace = ResourceNamespace
+ };
- request.Endpoints = new rmq::Endpoints();
- request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
- var address = new rmq::Address();
- address.Host = AccessPoint.Host;
- address.Port = AccessPoint.Port;
+ request.Endpoints = new rmq::Endpoints
+ {
+ Scheme = AccessPoint.HostScheme()
+ };
+ var address = new rmq::Address
+ {
+ Host = AccessPoint.Host,
+ Port = AccessPoint.Port
+ };
request.Endpoints.Addresses.Add(address);
var metadata = new Metadata();
- Signature.sign(this, metadata);
-
tasks.Add(_manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata,
request, TimeSpan.FromSeconds(3)));
+ Signature.Sign(this, metadata);
+ tasks.Add(Manager.QueryLoadAssignment(AccessPoint.TargetUrl(),
metadata, request, TimeSpan.FromSeconds(3)));
}
- List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
-
+ var list = await Task.WhenAll(tasks);
var i = 0;
foreach (var assignments in list)
{
@@ -129,36 +149,48 @@ namespace Org.Apache.Rocketmq
protected override void PrepareHeartbeatData(rmq::HeartbeatRequest
request)
{
request.ClientType = rmq::ClientType.SimpleConsumer;
- request.Group = new rmq::Resource();
- request.Group.Name = _group;
- request.Group.ResourceNamespace = ResourceNamespace;
+ request.Group = new rmq::Resource
+ {
+ Name = _group,
+ ResourceNamespace = ResourceNamespace
+ };
}
- public void Subscribe(string topic, rmq::FilterType filterType, string
expression)
+ public void Subscribe(string topic, FilterExpression filterExpression)
{
- var entry = new rmq::SubscriptionEntry();
- entry.Topic = new rmq::Resource();
- entry.Topic.Name = topic;
- entry.Topic.ResourceNamespace = ResourceNamespace;
- entry.Expression = new rmq::FilterExpression();
- entry.Expression.Type = filterType;
- entry.Expression.Expression = expression;
+ var entry = new rmq::SubscriptionEntry
+ {
+ Topic = new rmq::Resource
+ {
+ Name = topic,
+ ResourceNamespace = ResourceNamespace
+ },
+ Expression = new rmq::FilterExpression
+ {
+ Type = filterExpression.Type switch {
+ ExpressionType.TAG => rmq::FilterType.Tag,
+ ExpressionType.SQL92 => rmq::FilterType.Sql,
+ _ => rmq.FilterType.Tag
+ },
+ Expression = filterExpression.Expression
+ }
+ };
+
_subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
AddTopicOfInterest(topic);
}
- public override void OnSettingsReceived(rmq.Settings settings)
+ internal override void OnSettingsReceived(rmq.Settings settings)
{
base.OnSettingsReceived(settings);
if (settings.Subscription.Fifo)
{
- _fifo = true;
Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
}
}
- public async Task<List<Message>> Receive(int batchSize, TimeSpan
timeout)
+ public async Task<List<Message>> Receive(int batchSize, TimeSpan
invisibleDuration)
{
var messageQueue = NextQueue();
if (null == messageQueue)
@@ -167,37 +199,48 @@ namespace Org.Apache.Rocketmq
return new List<Message>();
}
- var request = new rmq.ReceiveMessageRequest();
- request.Group = new rmq.Resource();
- request.Group.ResourceNamespace = ResourceNamespace;
- request.Group.Name = _group;
+ var request = new rmq.ReceiveMessageRequest
+ {
+ Group = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = _group
+ },
+ MessageQueue = new rmq.MessageQueue()
+ };
- request.MessageQueue = new rmq.MessageQueue();
request.MessageQueue.MergeFrom(messageQueue);
request.BatchSize = batchSize;
+ request.InvisibleDuration =
Duration.FromTimeSpan(invisibleDuration);
// Client is responsible of extending message invisibility duration
request.AutoRenew = false;
var targetUrl = Utilities.TargetUrl(messageQueue);
var metadata = new Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
- return await _manager.ReceiveMessage(targetUrl, metadata, request,
timeout);
+ return await Manager.ReceiveMessage(targetUrl, metadata, request,
+ ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan());
}
public async Task Ack(Message message)
{
- var request = new rmq.AckMessageRequest();
- request.Group = new rmq.Resource();
- request.Group.ResourceNamespace = ResourceNamespace;
- request.Group.Name = _group;
+ var request = new rmq.AckMessageRequest
+ {
+ Group = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = _group
+ },
+ Topic = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = message.Topic
+ }
+ };
- request.Topic = new rmq.Resource();
- request.Topic.ResourceNamespace = ResourceNamespace;
- request.Topic.Name = message.Topic;
-
var entry = new rmq.AckMessageEntry();
request.Entries.Add(entry);
entry.MessageId = message.MessageId;
@@ -205,30 +248,33 @@ namespace Org.Apache.Rocketmq
var targetUrl = message._sourceHost;
var metadata = new Metadata();
- Signature.sign(this, metadata);
- await _manager.Ack(targetUrl, metadata, request, RequestTimeout);
+ Signature.Sign(this, metadata);
+ await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
}
public async Task ChangeInvisibleDuration(Message message, TimeSpan
invisibleDuration)
{
- var request = new rmq.ChangeInvisibleDurationRequest();
- request.Group = new rmq.Resource();
- request.Group.ResourceNamespace = ResourceNamespace;
- request.Group.Name = _group;
-
- request.Topic = new rmq.Resource();
- request.Topic.ResourceNamespace = ResourceNamespace;
- request.Topic.Name = message.Topic;
-
- request.ReceiptHandle = message._receiptHandle;
- request.MessageId = message.MessageId;
-
- request.InvisibleDuration =
Duration.FromTimeSpan(invisibleDuration);
+ var request = new rmq.ChangeInvisibleDurationRequest
+ {
+ Group = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = _group
+ },
+ Topic = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = message.Topic
+ },
+ ReceiptHandle = message._receiptHandle,
+ MessageId = message.MessageId,
+ InvisibleDuration = Duration.FromTimeSpan(invisibleDuration)
+ };
var targetUrl = message._sourceHost;
var metadata = new Metadata();
- Signature.sign(this, metadata);
- await _manager.ChangeInvisibleDuration(targetUrl, metadata,
request, RequestTimeout);
+ Signature.Sign(this, metadata);
+ await Manager.ChangeInvisibleDuration(targetUrl, metadata,
request, RequestTimeout);
}
private rmq.MessageQueue NextQueue()
@@ -238,35 +284,35 @@ namespace Org.Apache.Rocketmq
return null;
}
- UInt32 topicSeq = CurrentTopicSequence.Value;
- CurrentTopicSequence.Value = topicSeq + 1;
+ var topicSeq = _currentTopicSequence.Value;
+ _currentTopicSequence.Value = topicSeq + 1;
var total = _topicAssignments.Count;
var topicIndex = topicSeq % total;
var topic = _topicAssignments.Keys.Skip((int)topicIndex).First();
- UInt32 queueSeq = CurrentQueueSequence.Value;
- CurrentQueueSequence.Value = queueSeq + 1;
- List<rmq.Assignment> assignments;
- if (_topicAssignments.TryGetValue(topic, out assignments))
+ UInt32 queueSeq = _currentQueueSequence.Value;
+ _currentQueueSequence.Value = queueSeq + 1;
+ if (!_topicAssignments.TryGetValue(topic, out var assignments))
{
- if (null == assignments)
- {
- return null;
- }
- var idx = queueSeq % assignments.Count;
- return assignments[(int)idx].MessageQueue;
-
+ return null;
}
- return null;
+ var idx = queueSeq % assignments?.Count;
+ return assignments?[(int)idx].MessageQueue;
}
- private ThreadLocal<UInt32> CurrentTopicSequence = new
ThreadLocal<UInt32>(true);
- private ThreadLocal<UInt32> CurrentQueueSequence = new
ThreadLocal<UInt32>(true);
+ private readonly ThreadLocal<UInt32> _currentTopicSequence = new
ThreadLocal<UInt32>(true)
+ {
+ Value = 0
+ };
+
+ private readonly ThreadLocal<UInt32> _currentQueueSequence = new
ThreadLocal<UInt32>(true)
+ {
+ Value = 0
+ };
private readonly string _group;
- private bool _fifo;
private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry>
_subscriptions;
private readonly ConcurrentDictionary<string, List<rmq.Assignment>>
_topicAssignments;
private readonly CancellationTokenSource _scanAssignmentCts = new
CancellationTokenSource();
diff --git a/csharp/tests/ClientManagerTest.cs
b/csharp/tests/ClientManagerTest.cs
index af5983c..e12c027 100644
--- a/csharp/tests/ClientManagerTest.cs
+++ b/csharp/tests/ClientManagerTest.cs
@@ -48,7 +48,7 @@ namespace Org.Apache.Rocketmq
clientConfig.CredentialsProvider = credentialsProvider;
clientConfig.ResourceNamespace = resourceNamespace;
clientConfig.Region = "cn-hangzhou-pre";
- Signature.sign(clientConfig, metadata);
+ Signature.Sign(clientConfig, metadata);
var clientManager = new ClientManager();
string target = "https://116.62.231.199:80";
var topicRouteData = clientManager.ResolveRoute(target, metadata,
request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index b5094a2..bcf36a5 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -26,30 +26,14 @@ namespace tests
[TestClass]
public class ProducerTest
{
-
- private static AccessPoint _accessPoint;
-
- [ClassInitialize]
- public static void SetUp(TestContext context)
- {
- _accessPoint = new AccessPoint
- {
- Host = HOST,
- Port = PORT
- };
- }
-
- [ClassCleanup]
- public static void TearDown()
- {
- }
-
[TestMethod]
public async Task TestLifecycle()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
await producer.Shutdown();
}
@@ -57,21 +41,26 @@ namespace tests
[TestMethod]
public async Task TestSendStandardMessage()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
- var msg = new Message(topic, body);
-
- // Tag the massage. A message has at most one tag.
- msg.Tag = "Tag-0";
-
+ var msg = new Message(TOPIC, body)
+ {
+ // Tag the massage. A message has at most one tag.
+ Tag = "Tag-0"
+ };
+
// Associate the message with one or multiple keys
- var keys = new List<string>();
- keys.Add("k1");
- keys.Add("k2");
+ var keys = new List<string>
+ {
+ "k1",
+ "k2"
+ };
msg.Keys = keys;
var sendResult = await producer.Send(msg);
@@ -82,23 +71,28 @@ namespace tests
[TestMethod]
public async Task TestSendMultipleMessages()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
for (var i = 0; i < 128; i++)
{
- var msg = new Message(topic, body);
-
- // Tag the massage. A message has at most one tag.
- msg.Tag = "Tag-0";
-
+ var msg = new Message(TOPIC, body)
+ {
+ // Tag the massage. A message has at most one tag.
+ Tag = "Tag-0"
+ };
+
// Associate the message with one or multiple keys
- var keys = new List<string>();
- keys.Add("k1");
- keys.Add("k2");
+ var keys = new List<string>
+ {
+ "k1",
+ "k2"
+ };
msg.Keys = keys;
var sendResult = await producer.Send(msg);
Assert.IsNotNull(sendResult);
@@ -109,17 +103,20 @@ namespace tests
[TestMethod]
public async Task TestSendFifoMessage()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
- var msg = new Message(topic, body);
-
- // Messages of the same group will get delivered one after
another.
- msg.MessageGroup = "message-group-0";
-
+ var msg = new Message(TOPIC, body)
+ {
+ // Messages of the same group will get delivered one after
another.
+ MessageGroup = "message-group-0"
+ };
+
// Verify messages are FIFO iff their message group is not null or
empty.
Assert.IsTrue(msg.Fifo());
@@ -131,15 +128,19 @@ namespace tests
[TestMethod]
public async Task TestSendScheduledMessage()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
- var msg = new Message(topic, body);
-
- msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+ var msg = new Message(TOPIC, body)
+ {
+ DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10)
+ };
+
Assert.IsTrue(msg.Scheduled());
var sendResult = await producer.Send(msg);
@@ -154,15 +155,19 @@ namespace tests
[TestMethod]
public async Task TestSendMessage_Failure()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
- var msg = new Message(topic, body);
- msg.MessageGroup = "Group-0";
- msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+ var msg = new Message(TOPIC, body)
+ {
+ MessageGroup = "Group-0",
+ DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10)
+ };
Assert.IsTrue(msg.Scheduled());
try
@@ -176,10 +181,9 @@ namespace tests
await producer.Shutdown();
}
- private static string topic = "cpp_sdk_standard";
-
- private static string HOST = "127.0.0.1";
- private static int PORT = 8081;
+ private const string TOPIC = "cpp_sdk_standard";
+ private const string HOST = "127.0.0.1";
+ private const int PORT = 8081;
}
}
\ No newline at end of file
diff --git a/csharp/tests/RpcClientTest.cs b/csharp/tests/RpcClientTest.cs
index a1ecf82..b438047 100644
--- a/csharp/tests/RpcClientTest.cs
+++ b/csharp/tests/RpcClientTest.cs
@@ -39,7 +39,7 @@ namespace Org.Apache.Rocketmq
var rpc_client = new RpcClient(target);
var client_config = new ClientConfig();
var metadata = new grpc::Metadata();
- Signature.sign(client_config, metadata);
+ Signature.Sign(client_config, metadata);
var cmd = new rmq::TelemetryCommand();
cmd.Settings = new rmq::Settings();
@@ -107,7 +107,7 @@ namespace Org.Apache.Rocketmq
var rpc_client = new RpcClient(target);
var client_config = new ClientConfig();
var metadata = new grpc::Metadata();
- Signature.sign(client_config, metadata);
+ Signature.Sign(client_config, metadata);
var request = new rmq::QueryRouteRequest();
request.Topic = new rmq::Resource();
request.Topic.Name = "cpp_sdk_standard";
@@ -128,7 +128,7 @@ namespace Org.Apache.Rocketmq
var rpc_client = new RpcClient(target);
var client_config = new ClientConfig();
var metadata = new grpc::Metadata();
- Signature.sign(client_config, metadata);
+ Signature.Sign(client_config, metadata);
var request = new rmq::SendMessageRequest();
var message = new rmq::Message();
diff --git a/csharp/tests/SignatureTest.cs b/csharp/tests/SignatureTest.cs
index 16d0f46..12d1c10 100644
--- a/csharp/tests/SignatureTest.cs
+++ b/csharp/tests/SignatureTest.cs
@@ -15,11 +15,11 @@
* limitations under the License.
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using Moq;
-using System;
+using Org.Apache.Rocketmq;
-namespace Org.Apache.Rocketmq
+namespace tests
{
[TestClass]
@@ -27,11 +27,10 @@ namespace Org.Apache.Rocketmq
{
[TestMethod]
- public void testSign()
+ public void TestSign()
{
var mock = new Mock<IClientConfig>();
mock.Setup(x => x.getGroupName()).Returns("G1");
- mock.Setup(x => x.tenantId()).Returns("Tenant-id");
mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:");
mock.Setup(x => x.serviceName()).Returns("mq");
mock.Setup(x => x.region()).Returns("cn-hangzhou");
@@ -42,7 +41,7 @@ namespace Org.Apache.Rocketmq
mock.Setup(x =>
x.credentialsProvider()).Returns(credentialsProvider);
var metadata = new grpc::Metadata();
- Signature.sign(mock.Object, metadata);
+ Signature.Sign(mock.Object, metadata);
Assert.IsNotNull(metadata.Get(MetadataConstants.AUTHORIZATION));
}
}
diff --git a/csharp/tests/SimpleConsumerTest.cs
b/csharp/tests/SimpleConsumerTest.cs
index e5fc8f0..6f49eb1 100644
--- a/csharp/tests/SimpleConsumerTest.cs
+++ b/csharp/tests/SimpleConsumerTest.cs
@@ -39,7 +39,7 @@ namespace tests
public async Task TestLifecycle()
{
var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
- simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+ simpleConsumer.Subscribe(_topic, new FilterExpression("*",
ExpressionType.TAG));
await simpleConsumer.Start();
Thread.Sleep(1_000);
await simpleConsumer.Shutdown();
@@ -49,11 +49,11 @@ namespace tests
public async Task TestReceive()
{
var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
- simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+ simpleConsumer.Subscribe(_topic, new FilterExpression("*",
ExpressionType.TAG));
await simpleConsumer.Start();
var batchSize = 32;
- var receiveTimeout = TimeSpan.FromSeconds(10);
- var messages = await simpleConsumer.Receive(batchSize,
receiveTimeout);
+ var invisibleDuration = TimeSpan.FromSeconds(10);
+ var messages = await simpleConsumer.Receive(batchSize,
invisibleDuration);
Assert.IsTrue(messages.Count > 0);
Assert.IsTrue(messages.Count <= batchSize);
await simpleConsumer.Shutdown();
@@ -64,11 +64,11 @@ namespace tests
public async Task TestAck()
{
var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
- simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+ simpleConsumer.Subscribe(_topic, new FilterExpression("*",
ExpressionType.TAG));
await simpleConsumer.Start();
var batchSize = 32;
- var receiveTimeout = TimeSpan.FromSeconds(10);
- var messages = await simpleConsumer.Receive(batchSize,
receiveTimeout);
+ var invisibleDuration = TimeSpan.FromSeconds(10);
+ var messages = await simpleConsumer.Receive(batchSize,
invisibleDuration);
foreach (var message in messages)
{
await simpleConsumer.Ack(message);
@@ -81,11 +81,11 @@ namespace tests
public async Task TestChangeInvisibleDuration()
{
var simpleConsumer = new SimpleConsumer($"{HOST}:{PORT}", _group);
- simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+ simpleConsumer.Subscribe(_topic, new FilterExpression("*",
ExpressionType.TAG));
await simpleConsumer.Start();
var batchSize = 32;
- var receiveTimeout = TimeSpan.FromSeconds(10);
- var messages = await simpleConsumer.Receive(batchSize,
receiveTimeout);
+ var invisibleDuration = TimeSpan.FromSeconds(10);
+ var messages = await simpleConsumer.Receive(batchSize,
invisibleDuration);
foreach (var message in messages)
{
await simpleConsumer.ChangeInvisibleDuration(message,
TimeSpan.FromSeconds(10));