This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 008b4553 [ISSUE #903] [Rust] Fix sync settings show an error after the
connection is closed by the server side (#1025)
008b4553 is described below
commit 008b455302a9ad61f61eb1ea42f954e5718b54e3
Author: EnableAsync <[email protected]>
AuthorDate: Thu Jul 3 16:28:53 2025 +0800
[ISSUE #903] [Rust] Fix sync settings show an error after the connection is
closed by the server side (#1025)
---
.github/workflows/java_coverage.yml | 4 +-
.github/workflows/rust_build.yml | 20 +-
.github/workflows/rust_coverage.yaml | 3 +-
protos | 2 +-
rust/Cargo.toml | 6 +-
rust/build.rs | 13 +-
rust/proto/apache/rocketmq/v2/admin.proto | 43 ++
rust/proto/apache/rocketmq/v2/definition.proto | 564 +++++++++++++++++++++++++
rust/proto/apache/rocketmq/v2/service.proto | 411 ++++++++++++++++++
rust/src/client.rs | 23 +-
rust/src/error.rs | 3 +
rust/src/model/common.rs | 2 +-
rust/src/model/message_id.rs | 1 -
rust/src/pb/apache.rocketmq.v2.rs | 7 +-
rust/src/session.rs | 145 ++++---
15 files changed, 1144 insertions(+), 103 deletions(-)
diff --git a/.github/workflows/java_coverage.yml
b/.github/workflows/java_coverage.yml
index cd9289d7..dc56e63a 100644
--- a/.github/workflows/java_coverage.yml
+++ b/.github/workflows/java_coverage.yml
@@ -26,9 +26,9 @@ jobs:
paths: "**/TEST-*.xml"
if: always()
- name: Upload to Codecov
- uses: codecov/codecov-action@v3
+ uses: codecov/codecov-action@v4
with:
flags: java
fail_ci_if_error: true
- token: e7eb01be-398b-4f7f-a73e-dc35c428cb50
+ token: ${{ secrets.CODECOV_ACTION_KEY }}
verbose: true
diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml
index 90e48ced..ea013939 100644
--- a/.github/workflows/rust_build.yml
+++ b/.github/workflows/rust_build.yml
@@ -3,7 +3,7 @@ on:
workflow_call:
jobs:
fmt:
- name: code style check
+ name: Code style check
runs-on: ubuntu-latest
defaults:
run:
@@ -20,7 +20,7 @@ jobs:
- name: Code format check
run: cargo fmt --check
clippy:
- name: clippy
+ name: Clippy
runs-on: ubuntu-latest
defaults:
run:
@@ -35,10 +35,10 @@ jobs:
toolchain: stable
components: clippy
# Run clippy
- - name: clippy check
+ - name: Clippy check
run: cargo clippy --all-features -- -D warnings
doc:
- name: doc check
+ name: Doc check
runs-on: ubuntu-latest
defaults:
run:
@@ -62,7 +62,7 @@ jobs:
working-directory: ./rust
strategy:
matrix:
- msrv: [1.70.0]
+ msrv: [1.74.0]
name: MSRV ${{ matrix.msrv }} check
steps:
- uses: actions/checkout@v3
@@ -71,7 +71,7 @@ jobs:
- name: Install cargo-msrv
run: |
cargo install cargo-msrv
- cargo msrv --min ${{ matrix.msrv }}
+ cargo msrv verify --min ${{ matrix.msrv }}
build:
name: "${{ matrix.os }}"
runs-on: ${{ matrix.os }}
@@ -82,8 +82,8 @@ jobs:
strategy:
fail-fast: false
matrix:
- os: [ubuntu-20.04, macos-12, windows-2022]
- msrv: [1.70]
+ os: [Ubuntu-22.04, MacOS-15, Windows-2022]
+ msrv: [1.74]
steps:
- uses: actions/checkout@v2
with:
@@ -93,9 +93,9 @@ jobs:
toolchain: ${{ matrix.msrv }}
components: clippy
- name: Install protoc
- uses: arduino/setup-protoc@v1
+ uses: arduino/setup-protoc@v3
with:
- version: "3.x"
+ version: "29.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build
env:
diff --git a/.github/workflows/rust_coverage.yaml
b/.github/workflows/rust_coverage.yaml
index 85cbebd0..ad8ab749 100644
--- a/.github/workflows/rust_coverage.yaml
+++ b/.github/workflows/rust_coverage.yaml
@@ -33,9 +33,10 @@ jobs:
run: cargo llvm-cov --all-features --workspace --ignore-filename-regex
pb/ --codecov --output-path codecov.json
- name: Upload to codecov.io
- uses: codecov/codecov-action@v3
+ uses: codecov/codecov-action@v4
with:
files: ./rust/codecov.json
flags: rust
verbose: true
+ token: ${{ secrets.CODECOV_ACTION_KEY }}
fail_ci_if_error: true
diff --git a/protos b/protos
index df8f85cd..5c9f8419 160000
--- a/protos
+++ b/protos
@@ -1 +1 @@
-Subproject commit df8f85cdfa850b204adea777076c2d3d8fb0c3fc
+Subproject commit 5c9f84199bffa79b2ed73beb37774ca92e749c19
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index a5f4dcb3..692ab3f5 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -18,7 +18,7 @@
name = "rocketmq"
version = "5.0.0"
edition = "2021"
-rust-version = "1.70"
+rust-version = "1.74"
authors = [
"SSpirits <[email protected]>",
"Zhanhui Li <[email protected]>",
@@ -69,8 +69,8 @@ ring = "0.16.20"
tokio-util = { version = "=0.7.10", features = ["rt"] }
[build-dependencies]
-tonic-build = "0.9.0"
-which = "4.4.0"
+tonic-build = "0.10.2"
+which = "7.0.3"
version_check = "0.9.4"
regex = "1.7.3"
diff --git a/rust/build.rs b/rust/build.rs
index 90a42557..4d52371d 100644
--- a/rust/build.rs
+++ b/rust/build.rs
@@ -35,12 +35,13 @@ fn main() {
.out_dir("src/pb")
.compile(
&[
- "../protos/apache/rocketmq/v2/service.proto",
- "../protos/apache/rocketmq/v2/admin.proto",
+ "./proto/apache/rocketmq/v2/service.proto",
+ "./proto/apache/rocketmq/v2/admin.proto",
+ "./proto/apache/rocketmq/v2/definition.proto",
],
- &["../protos"],
+ &["./proto"],
)
- .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
+ .unwrap_or_else(|e| panic!("Failed to compile proto {e:?}"));
}
fn check_protoc_version() {
@@ -57,12 +58,12 @@ fn check_protoc_version() {
let result = cmd.output();
if result.is_err() {
- panic!("failed to invoke protoc: {:?}", result)
+ panic!("failed to invoke protoc: {result:?}")
}
let output = result.unwrap();
if !output.status.success() {
- panic!("protoc failed: {:?}", output)
+ panic!("protoc failed: {output:?}")
}
let version_regex =
Regex::new(r"(?:(\d+)\.)?(?:(\d+)\.)?(\*|\d+)").unwrap();
diff --git a/rust/proto/apache/rocketmq/v2/admin.proto
b/rust/proto/apache/rocketmq/v2/admin.proto
new file mode 100644
index 00000000..7dbb7027
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/admin.proto
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package apache.rocketmq.v2;
+
+option cc_enable_arenas = true;
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQAdmin";
+
+message ChangeLogLevelRequest {
+ enum Level {
+ TRACE = 0;
+ DEBUG = 1;
+ INFO = 2;
+ WARN = 3;
+ ERROR = 4;
+ }
+ Level level = 1;
+}
+
+message ChangeLogLevelResponse { string remark = 1; }
+
+service Admin {
+ rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {}
+}
\ No newline at end of file
diff --git a/rust/proto/apache/rocketmq/v2/definition.proto
b/rust/proto/apache/rocketmq/v2/definition.proto
new file mode 100644
index 00000000..753bfceb
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/definition.proto
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQDomain";
+
+enum TransactionResolution {
+ TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
+ COMMIT = 1;
+ ROLLBACK = 2;
+}
+
+enum TransactionSource {
+ SOURCE_UNSPECIFIED = 0;
+ SOURCE_CLIENT = 1;
+ SOURCE_SERVER_CHECK = 2;
+}
+
+enum Permission {
+ PERMISSION_UNSPECIFIED = 0;
+ NONE = 1;
+ READ = 2;
+ WRITE = 3;
+ READ_WRITE = 4;
+}
+
+enum FilterType {
+ FILTER_TYPE_UNSPECIFIED = 0;
+ TAG = 1;
+ SQL = 2;
+}
+
+message FilterExpression {
+ FilterType type = 1;
+ string expression = 2;
+}
+
+message RetryPolicy {
+ int32 max_attempts = 1;
+ oneof strategy {
+ ExponentialBackoff exponential_backoff = 2;
+ CustomizedBackoff customized_backoff = 3;
+ }
+}
+
+// https://en.wikipedia.org/wiki/Exponential_backoff
+message ExponentialBackoff {
+ google.protobuf.Duration initial = 1;
+ google.protobuf.Duration max = 2;
+ float multiplier = 3;
+}
+
+message CustomizedBackoff {
+ // To support classic backoff strategy which is arbitrary defined by end
users.
+ // Typical values are: `1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m
1h 2h`
+ repeated google.protobuf.Duration next = 1;
+}
+
+message Resource {
+ string resource_namespace = 1;
+
+ // Resource name identifier, which remains unique within the abstract
resource
+ // namespace.
+ string name = 2;
+}
+
+message SubscriptionEntry {
+ Resource topic = 1;
+ FilterExpression expression = 2;
+}
+
+enum AddressScheme {
+ ADDRESS_SCHEME_UNSPECIFIED = 0;
+ IPv4 = 1;
+ IPv6 = 2;
+ DOMAIN_NAME = 3;
+}
+
+message Address {
+ string host = 1;
+ int32 port = 2;
+}
+
+message Endpoints {
+ AddressScheme scheme = 1;
+ repeated Address addresses = 2;
+}
+
+message Broker {
+ // Name of the broker
+ string name = 1;
+
+ // Broker index. Canonically, index = 0 implies that the broker is playing
+ // leader role while brokers with index > 0 play follower role.
+ int32 id = 2;
+
+ // Address of the broker, complying with the following scheme
+ // 1. dns:[//authority/]host[:port]
+ // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
+ // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
+ Endpoints endpoints = 3;
+}
+
+message MessageQueue {
+ Resource topic = 1;
+ int32 id = 2;
+ Permission permission = 3;
+ Broker broker = 4;
+ repeated MessageType accept_message_types = 5;
+}
+
+enum MessageType {
+ MESSAGE_TYPE_UNSPECIFIED = 0;
+
+ NORMAL = 1;
+
+ // Sequenced message
+ FIFO = 2;
+
+ // Messages that are delivered after the specified duration.
+ DELAY = 3;
+
+ // Messages that are transactional. Only committed messages are delivered to
+ // subscribers.
+ TRANSACTION = 4;
+}
+
+enum DigestType {
+ DIGEST_TYPE_UNSPECIFIED = 0;
+
+ // CRC algorithm achieves goal of detecting random data error with lowest
+ // computation overhead.
+ CRC32 = 1;
+
+ // MD5 algorithm achieves good balance between collision rate and computation
+ // overhead.
+ MD5 = 2;
+
+ // SHA-family has substantially fewer collision with fair amount of
+ // computation.
+ SHA1 = 3;
+}
+
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishing, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messages subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+message Digest {
+ DigestType type = 1;
+ string checksum = 2;
+}
+
+enum ClientType {
+ CLIENT_TYPE_UNSPECIFIED = 0;
+ PRODUCER = 1;
+ PUSH_CONSUMER = 2;
+ SIMPLE_CONSUMER = 3;
+ PULL_CONSUMER = 4;
+}
+
+enum Encoding {
+ ENCODING_UNSPECIFIED = 0;
+
+ IDENTITY = 1;
+
+ GZIP = 2;
+}
+
+message SystemProperties {
+ // Tag, which is optional.
+ optional string tag = 1;
+
+ // Message keys
+ repeated string keys = 2;
+
+ // Message identifier, client-side generated, remains unique.
+ // if message_id is empty, the send message request will be aborted with
+ // status `INVALID_ARGUMENT`
+ string message_id = 3;
+
+ // Message body digest
+ Digest body_digest = 4;
+
+ // Message body encoding. Candidate options are identity, gzip, snappy etc.
+ Encoding body_encoding = 5;
+
+ // Message type, normal, FIFO or transactional.
+ MessageType message_type = 6;
+
+ // Message born time-point.
+ google.protobuf.Timestamp born_timestamp = 7;
+
+ // Message born host. Valid options are IPv4, IPv6 or client host domain
name.
+ string born_host = 8;
+
+ // Time-point at which the message is stored in the broker, which is absent
+ // for message publishing.
+ optional google.protobuf.Timestamp store_timestamp = 9;
+
+ // The broker that stores this message. It may be broker name, IP or
arbitrary
+ // identifier that uniquely identify the server.
+ string store_host = 10;
+
+ // Time-point at which broker delivers to clients, which is optional.
+ optional google.protobuf.Timestamp delivery_timestamp = 11;
+
+ // If a message is acquired by way of POP, this field holds the receipt,
+ // which is absent for message publishing.
+ // Clients use the receipt to acknowledge or negatively acknowledge the
+ // message.
+ optional string receipt_handle = 12;
+
+ // Message queue identifier in which a message is physically stored.
+ int32 queue_id = 13;
+
+ // Message-queue offset at which a message is stored, which is absent for
+ // message publishing.
+ optional int64 queue_offset = 14;
+
+ // Period of time servers would remain invisible once a message is acquired.
+ optional google.protobuf.Duration invisible_duration = 15;
+
+ // Business code may failed to process messages for the moment. Hence,
clients
+ // may request servers to deliver them again using certain back-off strategy,
+ // the attempt is 1 not 0 if message is delivered first time, and it is
absent
+ // for message publishing.
+ optional int32 delivery_attempt = 16;
+
+ // Define the group name of message in the same topic, which is optional.
+ optional string message_group = 17;
+
+ // Trace context for each message, which is optional.
+ optional string trace_context = 18;
+
+ // If a transactional message stay unresolved for more than
+ // `transaction_orphan_threshold`, it would be regarded as an
+ // orphan. Servers that manages orphan messages would pick up
+ // a capable publisher to resolve
+ optional google.protobuf.Duration orphaned_transaction_recovery_duration =
19;
+
+ // Information to identify whether this message is from dead letter queue.
+ optional DeadLetterQueue dead_letter_queue = 20;
+}
+
+message DeadLetterQueue {
+ // Original topic for this DLQ message.
+ string topic = 1;
+ // Original message id for this DLQ message.
+ string message_id = 2;
+}
+
+message Message {
+
+ Resource topic = 1;
+
+ // User defined key-value pairs.
+ // If user_properties contain the reserved keys by RocketMQ,
+ // the send message request will be aborted with status `INVALID_ARGUMENT`.
+ // See below links for the reserved keys
+ //
https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
+ map<string, string> user_properties = 2;
+
+ SystemProperties system_properties = 3;
+
+ bytes body = 4;
+}
+
+message Assignment {
+ MessageQueue message_queue = 1;
+}
+
+enum Code {
+ CODE_UNSPECIFIED = 0;
+
+ // Generic code for success.
+ OK = 20000;
+
+ // Generic code for multiple return results.
+ MULTIPLE_RESULTS = 30000;
+
+ // Generic code for bad request, indicating that required fields or headers
are missing.
+ BAD_REQUEST = 40000;
+ // Format of access point is illegal.
+ ILLEGAL_ACCESS_POINT = 40001;
+ // Format of topic is illegal.
+ ILLEGAL_TOPIC = 40002;
+ // Format of consumer group is illegal.
+ ILLEGAL_CONSUMER_GROUP = 40003;
+ // Format of message tag is illegal.
+ ILLEGAL_MESSAGE_TAG = 40004;
+ // Format of message key is illegal.
+ ILLEGAL_MESSAGE_KEY = 40005;
+ // Format of message group is illegal.
+ ILLEGAL_MESSAGE_GROUP = 40006;
+ // Format of message property key is illegal.
+ ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
+ // Transaction id is invalid.
+ INVALID_TRANSACTION_ID = 40008;
+ // Format of message id is illegal.
+ ILLEGAL_MESSAGE_ID = 40009;
+ // Format of filter expression is illegal.
+ ILLEGAL_FILTER_EXPRESSION = 40010;
+ // The invisible time of request is invalid.
+ ILLEGAL_INVISIBLE_TIME = 40011;
+ // The delivery timestamp of message is invalid.
+ ILLEGAL_DELIVERY_TIME = 40012;
+ // Receipt handle of message is invalid.
+ INVALID_RECEIPT_HANDLE = 40013;
+ // Message property conflicts with its type.
+ MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40014;
+ // Client type could not be recognized.
+ UNRECOGNIZED_CLIENT_TYPE = 40015;
+ // Message is corrupted.
+ MESSAGE_CORRUPTED = 40016;
+ // Request is rejected due to missing of x-mq-client-id header.
+ CLIENT_ID_REQUIRED = 40017;
+ // Polling time is illegal.
+ ILLEGAL_POLLING_TIME = 40018;
+
+ // Generic code indicates that the client request lacks valid authentication
+ // credentials for the requested resource.
+ UNAUTHORIZED = 40100;
+
+ // Generic code indicates that the account is suspended due to overdue of
payment.
+ PAYMENT_REQUIRED = 40200;
+
+ // Generic code for the case that user does not have the permission to
operate.
+ FORBIDDEN = 40300;
+
+ // Generic code for resource not found.
+ NOT_FOUND = 40400;
+ // Message not found from server.
+ MESSAGE_NOT_FOUND = 40401;
+ // Topic resource does not exist.
+ TOPIC_NOT_FOUND = 40402;
+ // Consumer group resource does not exist.
+ CONSUMER_GROUP_NOT_FOUND = 40403;
+
+ // Generic code representing client side timeout when connecting to, reading
data from, or write data to server.
+ REQUEST_TIMEOUT = 40800;
+
+ // Generic code represents that the request entity is larger than limits
defined by server.
+ PAYLOAD_TOO_LARGE = 41300;
+ // Message body size exceeds the threshold.
+ MESSAGE_BODY_TOO_LARGE = 41301;
+
+ // Generic code for use cases where pre-conditions are not met.
+ // For example, if a producer instance is used to publish messages without
prior start() invocation,
+ // this error code will be raised.
+ PRECONDITION_FAILED = 42800;
+
+ // Generic code indicates that too many requests are made in short period of
duration.
+ // Requests are throttled.
+ TOO_MANY_REQUESTS = 42900;
+
+ // Generic code for the case that the server is unwilling to process the
request because its header fields are too large.
+ // The request may be resubmitted after reducing the size of the request
header fields.
+ REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
+ // Message properties total size exceeds the threshold.
+ MESSAGE_PROPERTIES_TOO_LARGE = 43101;
+
+ // Generic code indicates that server/client encountered an unexpected
+ // condition that prevented it from fulfilling the request.
+ INTERNAL_ERROR = 50000;
+ // Code indicates that the server encountered an unexpected condition
+ // that prevented it from fulfilling the request.
+ // This error response is a generic "catch-all" response.
+ // Usually, this indicates the server cannot find a better alternative
+ // error code to response. Sometimes, server administrators log error
+ // responses like the 500 status code with more details about the request
+ // to prevent the error from happening again in the future.
+ //
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
+ INTERNAL_SERVER_ERROR = 50001;
+ // The HA-mechanism is not working now.
+ HA_NOT_AVAILABLE = 50002;
+
+ // Generic code means that the server or client does not support the
+ // functionality required to fulfill the request.
+ NOT_IMPLEMENTED = 50100;
+
+ // Generic code represents that the server, which acts as a gateway or proxy,
+ // does not get an satisfied response in time from its upstream servers.
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+ PROXY_TIMEOUT = 50400;
+ // Message persistence timeout.
+ MASTER_PERSISTENCE_TIMEOUT = 50401;
+ // Slave persistence timeout.
+ SLAVE_PERSISTENCE_TIMEOUT = 50402;
+
+ // Generic code for unsupported operation.
+ UNSUPPORTED = 50500;
+ // Operation is not allowed in current version.
+ VERSION_UNSUPPORTED = 50501;
+ // Not allowed to verify message. Chances are that you are verifying
+ // a FIFO message, as is violating FIFO semantics.
+ VERIFY_FIFO_MESSAGE_UNSUPPORTED = 50502;
+
+ // Generic code for failed message consumption.
+ FAILED_TO_CONSUME_MESSAGE = 60000;
+}
+
+message Status {
+ Code code = 1;
+ string message = 2;
+}
+
+enum Language {
+ LANGUAGE_UNSPECIFIED = 0;
+ JAVA = 1;
+ CPP = 2;
+ DOT_NET = 3;
+ GOLANG = 4;
+ RUST = 5;
+ PYTHON = 6;
+ PHP = 7;
+ NODE_JS = 8;
+ RUBY = 9;
+ OBJECTIVE_C = 10;
+ DART = 11;
+ KOTLIN = 12;
+}
+
+// User Agent
+message UA {
+ // SDK language
+ Language language = 1;
+
+ // SDK version
+ string version = 2;
+
+ // Platform details, including OS name, version, arch etc.
+ string platform = 3;
+
+ // Hostname of the node
+ string hostname = 4;
+}
+
+message Settings {
+ // Configurations for all clients.
+ optional ClientType client_type = 1;
+
+ optional Endpoints access_point = 2;
+
+ // If publishing of messages encounters throttling or server internal errors,
+ // publishers should implement automatic retries after progressive longer
+ // back-offs for consecutive errors.
+ //
+ // When processing message fails, `backoff_policy` describes an interval
+ // after which the message should be available to consume again.
+ //
+ // For FIFO messages, the interval should be relatively small because
+ // messages of the same message group would not be readily available until
+ // the prior one depletes its lifecycle.
+ optional RetryPolicy backoff_policy = 3;
+
+ // Request timeout for RPCs excluding long-polling.
+ optional google.protobuf.Duration request_timeout = 4;
+
+ oneof pub_sub {
+ Publishing publishing = 5;
+
+ Subscription subscription = 6;
+ }
+
+ // User agent details
+ UA user_agent = 7;
+
+ Metric metric = 8;
+}
+
+message Publishing {
+ // Publishing settings below here is appointed by client, thus it is
+ // unnecessary for server to push at present.
+ //
+ // List of topics to which messages will publish to.
+ repeated Resource topics = 1;
+
+ // If the message body size exceeds `max_body_size`, broker servers would
+ // reject the request. As a result, it is advisable that Producer performs
+ // client-side check validation.
+ int32 max_body_size = 2;
+
+ // When `validate_message_type` flag set `false`, no need to validate
message's type
+ // with messageQueue's `accept_message_types` before publishing.
+ bool validate_message_type = 3;
+}
+
+message Subscription {
+ // Subscription settings below here is appointed by client, thus it is
+ // unnecessary for server to push at present.
+ //
+ // Consumer group.
+ optional Resource group = 1;
+
+ // Subscription for consumer.
+ repeated SubscriptionEntry subscriptions = 2;
+
+ // Subscription settings below here are from server, it is essential for
+ // server to push.
+ //
+ // When FIFO flag is `true`, messages of the same message group are processed
+ // in first-in-first-out manner.
+ //
+ // Brokers will not deliver further messages of the same group until prior
+ // ones are completely acknowledged.
+ optional bool fifo = 3;
+
+ // Message receive batch size here is essential for push consumer.
+ optional int32 receive_batch_size = 4;
+
+ // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
+ // push consumer.
+ optional google.protobuf.Duration long_polling_timeout = 5;
+}
+
+message Metric {
+ // Indicates that if client should export local metrics to server.
+ bool on = 1;
+
+ // The endpoint that client metrics should be exported to, which is required
if the switch is on.
+ optional Endpoints endpoints = 2;
+}
+
+enum QueryOffsetPolicy {
+ // Use this option if client wishes to playback all existing messages.
+ BEGINNING = 0;
+
+ // Use this option if client wishes to skip all existing messages.
+ END = 1;
+
+ // Use this option if time-based seek is targeted.
+ TIMESTAMP = 2;
+}
\ No newline at end of file
diff --git a/rust/proto/apache/rocketmq/v2/service.proto
b/rust/proto/apache/rocketmq/v2/service.proto
new file mode 100644
index 00000000..f662f769
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/service.proto
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+import "apache/rocketmq/v2/definition.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
+
+// Topics are destination of messages to publish to or subscribe from. Similar
+// to domain names, they will be addressable after resolution through the
+// provided access point.
+//
+// Access points are usually the addresses of name servers, which fulfill
+// service discovery, load-balancing and other auxiliary services. Name servers
+// receive periodic heartbeats from affiliate brokers and erase those which
+// failed to maintain alive status.
+//
+// Name servers answer queries of QueryRouteRequest, responding clients with
+// addressable message-queues, which they may directly publish messages to or
+// subscribe messages from.
+//
+// QueryRouteRequest shall include source endpoints, aka, configured
+// access-point, which annotates tenant-id, instance-id or other
+// vendor-specific settings. Purpose-built name servers may respond customized
+// results based on these particular requirements.
+message QueryRouteRequest {
+ Resource topic = 1;
+ Endpoints endpoints = 2;
+}
+
+message QueryRouteResponse {
+ Status status = 1;
+
+ repeated MessageQueue message_queues = 2;
+}
+
+message SendMessageRequest {
+ repeated Message messages = 1;
+}
+
+message SendResultEntry {
+ Status status = 1;
+ string message_id = 2;
+ string transaction_id = 3;
+ int64 offset = 4;
+}
+
+message SendMessageResponse {
+ Status status = 1;
+
+ // Some implementation may have partial failure issues. Client SDK
developers are expected to inspect
+ // each entry for best certainty.
+ repeated SendResultEntry entries = 2;
+}
+
+message QueryAssignmentRequest {
+ Resource topic = 1;
+ Resource group = 2;
+ Endpoints endpoints = 3;
+}
+
+message QueryAssignmentResponse {
+ Status status = 1;
+ repeated Assignment assignments = 2;
+}
+
+message ReceiveMessageRequest {
+ Resource group = 1;
+ MessageQueue message_queue = 2;
+ FilterExpression filter_expression = 3;
+ int32 batch_size = 4;
+ // Required if client type is simple consumer.
+ optional google.protobuf.Duration invisible_duration = 5;
+ // For message auto renew and clean
+ bool auto_renew = 6;
+ optional google.protobuf.Duration long_polling_timeout = 7;
+}
+
+message ReceiveMessageResponse {
+ oneof content {
+ Status status = 1;
+ Message message = 2;
+ // The timestamp that brokers start to deliver status line or message.
+ google.protobuf.Timestamp delivery_timestamp = 3;
+ }
+}
+
+message AckMessageEntry {
+ string message_id = 1;
+ string receipt_handle = 2;
+}
+
+message AckMessageRequest {
+ Resource group = 1;
+ Resource topic = 2;
+ repeated AckMessageEntry entries = 3;
+}
+
+message AckMessageResultEntry {
+ string message_id = 1;
+ string receipt_handle = 2;
+
+ // Acknowledge result may be acquired through inspecting
+ // `status.code`; In case acknowledgement failed, `status.message`
+ // is the explanation of the failure.
+ Status status = 3;
+}
+
+message AckMessageResponse {
+
+ // RPC tier status, which is used to represent RPC-level errors including
+ // authentication, authorization, throttling and other general failures.
+ Status status = 1;
+
+ repeated AckMessageResultEntry entries = 2;
+}
+
+message ForwardMessageToDeadLetterQueueRequest {
+ Resource group = 1;
+ Resource topic = 2;
+ string receipt_handle = 3;
+ string message_id = 4;
+ int32 delivery_attempt = 5;
+ int32 max_delivery_attempts = 6;
+}
+
+message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
+
+message HeartbeatRequest {
+ optional Resource group = 1;
+ ClientType client_type = 2;
+}
+
+message HeartbeatResponse { Status status = 1; }
+
+message EndTransactionRequest {
+ Resource topic = 1;
+ string message_id = 2;
+ string transaction_id = 3;
+ TransactionResolution resolution = 4;
+ TransactionSource source = 5;
+ string trace_context = 6;
+}
+
+message EndTransactionResponse { Status status = 1; }
+
+message PrintThreadStackTraceCommand { string nonce = 1; }
+
+message ThreadStackTrace {
+ string nonce = 1;
+ optional string thread_stack_trace = 2;
+}
+
+message VerifyMessageCommand {
+ string nonce = 1;
+ Message message = 2;
+}
+
+message VerifyMessageResult {
+ string nonce = 1;
+}
+
+message RecoverOrphanedTransactionCommand {
+ Message message = 1;
+ string transaction_id = 2;
+}
+
+message TelemetryCommand {
+ optional Status status = 1;
+
+ oneof command {
+ // Client settings
+ Settings settings = 2;
+
+ // These messages are from client.
+ //
+ // Report thread stack trace to server.
+ ThreadStackTrace thread_stack_trace = 3;
+
+ // Report message verify result to server.
+ VerifyMessageResult verify_message_result = 4;
+
+ // There messages are from server.
+ //
+ // Request client to recover the orphaned transaction message.
+ RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
+
+ // Request client to print thread stack trace.
+ PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
+
+ // Request client to verify the consumption of the appointed message.
+ VerifyMessageCommand verify_message_command = 7;
+ }
+}
+
+message NotifyClientTerminationRequest {
+ // Consumer group, which is absent for producer.
+ optional Resource group = 1;
+}
+
+message NotifyClientTerminationResponse { Status status = 1; }
+
+message ChangeInvisibleDurationRequest {
+ Resource group = 1;
+ Resource topic = 2;
+
+ // Unique receipt handle to identify message to change
+ string receipt_handle = 3;
+
+ // New invisible duration
+ google.protobuf.Duration invisible_duration = 4;
+
+ // For message tracing
+ string message_id = 5;
+}
+
+message ChangeInvisibleDurationResponse {
+ Status status = 1;
+
+ // Server may generate a new receipt handle for the message.
+ string receipt_handle = 2;
+}
+
+message PullMessageRequest {
+ Resource group = 1;
+ MessageQueue message_queue = 2;
+ int64 offset = 3;
+ int32 batch_size = 4;
+ FilterExpression filter_expression = 5;
+ google.protobuf.Duration long_polling_timeout = 6;
+}
+
+message PullMessageResponse {
+ oneof content {
+ Status status = 1;
+ Message message = 2;
+ int64 next_offset = 3;
+ }
+}
+
+message UpdateOffsetRequest {
+ Resource group = 1;
+ MessageQueue message_queue = 2;
+ int64 offset = 3;
+}
+
+message UpdateOffsetResponse {
+ Status status = 1;
+}
+
+message GetOffsetRequest {
+ Resource group = 1;
+ MessageQueue message_queue = 2;
+}
+
+message GetOffsetResponse {
+ Status status = 1;
+ int64 offset = 2;
+}
+
+message QueryOffsetRequest {
+ MessageQueue message_queue = 1;
+ QueryOffsetPolicy query_offset_policy = 2;
+ optional google.protobuf.Timestamp timestamp = 3;
+}
+
+message QueryOffsetResponse {
+ Status status = 1;
+ int64 offset = 2;
+}
+
+// For all the RPCs in MessagingService, the following error handling policies
+// apply:
+//
+// If the request doesn't bear a valid authentication credential, return a
+// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
+// user is not granted with sufficient permission to execute the requested
+// operation, return a response with common.status.code == `PERMISSION_DENIED`.
+// If the per-user-resource-based quota is exhausted, return a response with
+// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
+// errors raise, return a response with common.status.code == `INTERNAL`.
+service MessagingService {
+
+ // Queries the route entries of the requested topic in the perspective of the
+ // given endpoints. On success, servers should return a collection of
+ // addressable message-queues. Note servers may return customized route
+ // entries based on endpoints provided.
+ //
+ // If the requested topic doesn't exist, returns `NOT_FOUND`.
+ // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+ rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
+
+ // Producer or consumer sends HeartbeatRequest to servers periodically to
+ // keep-alive. Additionally, it also reports client-side configuration,
+ // including topic subscription, load-balancing group name, etc.
+ //
+ // Returns `OK` if success.
+ //
+ // If a client specifies a language that is not yet supported by servers,
+ // returns `INVALID_ARGUMENT`
+ rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
+
+ // Delivers messages to brokers.
+ // Clients may further:
+ // 1. Refine a message destination to message-queues which fulfills parts of
+ // FIFO semantic;
+ // 2. Flag a message as transactional, which keeps it invisible to consumers
+ // until it commits;
+ // 3. Time a message, making it invisible to consumers till specified
+ // time-point;
+ // 4. And more...
+ //
+ // Returns message-id or transaction-id with status `OK` on success.
+ //
+ // If the destination topic doesn't exist, returns `NOT_FOUND`.
+ rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
+
+ // Queries the assigned route info of a topic for current consumer,
+ // the returned assignment result is decided by server-side load balancer.
+ //
+ // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+ // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+ rpc QueryAssignment(QueryAssignmentRequest) returns
(QueryAssignmentResponse) {
+ }
+
+ // Receives messages from the server in batch manner, returns a set of
+ // messages if success. The received messages should be acked or redelivered
+ // after processed.
+ //
+ // If the pending concurrent receive requests exceed the quota of the given
+ // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+ // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+ // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+ // message in the specific topic, returns `OK` with an empty message set.
+ // Please note that client may suffer from false empty responses.
+ //
+ // If failed to receive message from remote, server must return only one
+ // `ReceiveMessageResponse` as the reply to the request, whose `Status`
indicates
+ // the specific reason of failure, otherwise, the reply is considered
successful.
+ rpc ReceiveMessage(ReceiveMessageRequest) returns (stream
ReceiveMessageResponse) {
+ }
+
+ // Acknowledges the message associated with the `receipt_handle` or `offset`
+ // in the `AckMessageRequest`, it means the message has been successfully
+ // processed. Returns `OK` if the message server remove the relevant message
+ // successfully.
+ //
+ // If the given receipt_handle is illegal or out of date, returns
+ // `INVALID_ARGUMENT`.
+ rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
+
+ // Forwards one message to dead letter queue if the max delivery attempts is
+ // exceeded by this message at client-side, return `OK` if success.
+ rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
+ returns (ForwardMessageToDeadLetterQueueResponse) {}
+
+ rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {}
+
+ rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
+
+ rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {}
+
+ rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
+
+ // Commits or rollback one transactional message.
+ rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
+
+ // Once a client starts, it would immediately establishes bi-lateral stream
+ // RPCs with brokers, reporting its settings as the initiative command.
+ //
+ // When servers have need of inspecting client status, they would issue
+ // telemetry commands to clients. After executing received instructions,
+ // clients shall report command execution results through client-side
streams.
+ rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
+
+ // Notify the server that the client is terminated.
+ rpc NotifyClientTermination(NotifyClientTerminationRequest) returns
(NotifyClientTerminationResponse) {
+ }
+
+ // Once a message is retrieved from consume queue on behalf of the group, it
+ // will be kept invisible to other clients of the same group for a period of
+ // time. The message is supposed to be processed within the invisible
+ // duration. If the client, which is in charge of the invisible message, is
+ // not capable of processing the message timely, it may use
+ // ChangeInvisibleDuration to lengthen invisible duration.
+ rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns
(ChangeInvisibleDurationResponse) {
+ }
+}
\ No newline at end of file
diff --git a/rust/src/client.rs b/rust/src/client.rs
index b2c77092..3b286fc4 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -731,7 +731,7 @@ impl SessionManager {
) -> Result<Session, ClientError> {
let mut session_map = self.session_map.lock().await;
let endpoint_url = endpoints.endpoint_url().to_string();
- return if session_map.contains_key(&endpoint_url) {
+ if session_map.contains_key(&endpoint_url) {
Ok(session_map.get(&endpoint_url).unwrap().shadow_session())
} else {
let mut session = Session::new(
@@ -745,9 +745,8 @@ impl SessionManager {
let shadow_session = session.shadow_session();
session_map.insert(endpoint_url.clone(), session);
Ok(shadow_session)
- };
+ }
}
-
pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>,
ClientError> {
let session_map = self.session_map.lock().await;
let mut sessions = Vec::new();
@@ -864,23 +863,7 @@ pub(crate) mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn client_start() -> Result<(), ClientError> {
- let context = MockSession::new_context();
- context.expect().returning(|_, _, _, _| {
- let mut session = MockSession::default();
- session.expect_start().returning(|_, _| Ok(()));
- session
- .expect_shadow_session()
- .returning(|| MockSession::default());
- Ok(session)
- });
- let session_manager = new_session_manager();
- let mut client = new_client_with_session_manager(session_manager);
- let (tx, _) = mpsc::channel(16);
- client.start(tx).await?;
-
- // TODO use countdown latch instead sleeping
- // wait for run
- tokio::time::sleep(Duration::from_secs(1)).await;
+ // TODO: add grpc bi-stream test
Ok(())
}
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 55afd915..7e33e4c4 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -57,6 +57,9 @@ pub enum ErrorKind {
#[error("Failed to receive message via channel")]
ChannelReceive,
+ #[error("Grpc server unavailable, may be corrected by retrying")]
+ ServerUnavailable,
+
#[error("Unknown error")]
Unknown,
}
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 8c256f83..9401cb18 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -83,7 +83,7 @@ impl Endpoints {
let port_i32 = port.parse::<i32>().map_err(|e| {
ClientError::new(
ErrorKind::Config,
- &format!("port {} in endpoint url is invalid", port),
+ &format!("port {port} in endpoint url is invalid"),
Self::OPERATION_PARSE,
)
.with_context("url", endpoint_url)
diff --git a/rust/src/model/message_id.rs b/rust/src/model/message_id.rs
index cd718b77..9bc5ba3d 100644
--- a/rust/src/model/message_id.rs
+++ b/rust/src/model/message_id.rs
@@ -58,7 +58,6 @@ use time::{Date, OffsetDateTime, PrimitiveDateTime, Time};
* (lower 4bytes)
* </pre>
*/
-
// inspired by https://github.com/messense/rocketmq-rs
pub(crate) static UNIQ_ID_GENERATOR: Lazy<Mutex<UniqueIdGenerator>> =
Lazy::new(|| {
let mut wtr = Vec::new();
diff --git a/rust/src/pb/apache.rocketmq.v2.rs
b/rust/src/pb/apache.rocketmq.v2.rs
index 2483e5d5..578895f9 100644
--- a/rust/src/pb/apache.rocketmq.v2.rs
+++ b/rust/src/pb/apache.rocketmq.v2.rs
@@ -1,3 +1,4 @@
+// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FilterExpression {
@@ -89,9 +90,9 @@ pub struct Broker {
#[prost(int32, tag = "2")]
pub id: i32,
/// Address of the broker, complying with the following scheme
- /// 1. dns:\[//authority/]host[:port\]
- /// 2. ipv4:address\[:port][,address[:port],...\] – IPv4 addresses
- /// 3. ipv6:address\[:port][,address[:port],...\] – IPv6 addresses
+ /// 1. dns:\[//authority/\]host\[:port\]
+ /// 2. ipv4:address[:port][,address\[:port\],...] – IPv4 addresses
+ /// 3. ipv6:address[:port][,address\[:port\],...] – IPv6 addresses
#[prost(message, optional, tag = "3")]
pub endpoints: ::core::option::Option<Endpoints>,
}
diff --git a/rust/src/session.rs b/rust/src/session.rs
index d16a70d8..ebd0687b 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -110,6 +110,7 @@ pub(crate) struct Session {
endpoints: Endpoints,
stub: MessagingServiceClient<Channel>,
telemetry_tx: Option<mpsc::Sender<TelemetryCommand>>,
+ telemetry_command_tx: Option<mpsc::Sender<Command>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
@@ -127,6 +128,7 @@ impl Session {
endpoints: self.endpoints.clone(),
stub: self.stub.clone(),
telemetry_tx: self.telemetry_tx.clone(),
+ telemetry_command_tx: self.telemetry_command_tx.clone(),
shutdown_tx: None,
}
}
@@ -179,6 +181,7 @@ impl Session {
client_id,
stub,
telemetry_tx: None,
+ telemetry_command_tx: None,
shutdown_tx: None,
})
}
@@ -272,8 +275,7 @@ impl Session {
let signature = hmac::sign(&key, date_time.as_bytes());
let signature = hex::encode(signature.as_ref());
let authorization = format!(
- "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time,
Signature={}",
- access_key, signature
+ "MQv2-HMAC-SHA1 Credential={access_key},
SignedHeaders=x-mq-date-time, Signature={signature}"
);
metadata.insert(
"authorization",
@@ -287,6 +289,31 @@ impl Session {
settings: TelemetryCommand,
telemetry_command_tx: mpsc::Sender<Command>,
) -> Result<(), ClientError> {
+ self.telemetry_command_tx = Some(telemetry_command_tx);
+
+ self.establish_telemetry_stream(settings).await?;
+
+ debug!(
+ self.logger,
+ "telemetry_command_tx: {:?}", self.telemetry_command_tx
+ );
+ info!(self.logger, "starting client success");
+ Ok(())
+ }
+
+ async fn establish_telemetry_stream(
+ &mut self,
+ settings: TelemetryCommand,
+ ) -> Result<(), ClientError> {
+ if let Some(old_shutdown_tx) = self.shutdown_tx.take() {
+ // a `send` error means the receiver is already gone, which is
fine.
+ let _ = old_shutdown_tx.send(());
+ info!(
+ self.logger,
+ "sent shutdown signal to the previous telemetry stream."
+ );
+ }
+
let (tx, rx) = mpsc::channel(16);
tx.send(settings).await.map_err(|e| {
ClientError::new(
@@ -309,9 +336,12 @@ impl Session {
})?;
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+
self.shutdown_tx = Some(shutdown_tx);
+ self.telemetry_tx = Some(tx);
let logger = self.logger.clone();
+ let telemetry_command_tx =
self.telemetry_command_tx.as_ref().unwrap().clone();
tokio::spawn(async move {
let mut stream = response.into_inner();
loop {
@@ -330,6 +360,7 @@ impl Session {
}
Err(e) => {
error!(logger, "telemetry response error:
{:?}", e);
+ break;
}
}
}
@@ -339,9 +370,10 @@ impl Session {
}
}
}
+ // telemetry tx will be dropped here
});
- let _ = self.telemetry_tx.insert(tx);
debug!(self.logger, "start session success");
+
Ok(())
}
@@ -352,24 +384,39 @@ impl Session {
}
pub(crate) fn is_started(&self) -> bool {
- self.shutdown_tx.is_some()
+ self.telemetry_tx.is_some() &&
!self.telemetry_tx.as_ref().unwrap().is_closed()
}
pub(crate) async fn update_settings(
&mut self,
settings: TelemetryCommand,
) -> Result<(), ClientError> {
- if let Some(tx) = self.telemetry_tx.as_ref() {
- tx.send(settings).await.map_err(|e| {
- ClientError::new(
- ErrorKind::ChannelSend,
- "failed to send telemetry command",
- OPERATION_UPDATE_SETTINGS,
- )
- .set_source(e)
- })?;
+ if self.is_started() {
+ debug!(
+ self.logger,
+ "session is already started: {:?}", self.telemetry_tx
+ );
+ if let Some(tx) = self.telemetry_tx.as_ref() {
+ tx.send(settings).await.map_err(|e| {
+ ClientError::new(
+ ErrorKind::ChannelSend,
+ "failed to send telemetry command",
+ OPERATION_UPDATE_SETTINGS,
+ )
+ .set_source(e)
+ })?;
+ }
+ Ok(())
+ } else {
+ debug!(self.logger, "session is closed: {:?}", self.telemetry_tx);
+ self.establish_telemetry_stream(settings).await?;
+ debug!(
+ self.logger,
+ "session is established, closed: {:?}",
+ self.telemetry_tx.as_ref().unwrap().is_closed()
+ );
+ Ok(())
}
- Ok(())
}
}
@@ -396,14 +443,24 @@ impl RPCClient for Session {
request: HeartbeatRequest,
) -> Result<HeartbeatResponse, ClientError> {
let request = self.sign(request);
- let response = self.stub.heartbeat(request).await.map_err(|e| {
- ClientError::new(
- ErrorKind::ClientInternal,
- "send rpc heartbeat failed",
- OPERATION_HEARTBEAT,
- )
- .set_source(e)
- })?;
+ let response = self
+ .stub
+ .heartbeat(request)
+ .await
+ .map_err(|e| match e.code() {
+ tonic::Code::Unavailable => ClientError::new(
+ ErrorKind::ServerUnavailable,
+ "server unavailable",
+ OPERATION_HEARTBEAT,
+ )
+ .set_source(e),
+ _ => ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc heartbeat failed",
+ OPERATION_HEARTBEAT,
+ )
+ .set_source(e),
+ })?;
Ok(response.into_inner())
}
@@ -434,13 +491,19 @@ impl RPCClient for Session {
.stub
.receive_message(request)
.await
- .map_err(|e| {
- ClientError::new(
+ .map_err(|e| match e.code() {
+ tonic::Code::Unavailable => ClientError::new(
+ ErrorKind::ServerUnavailable,
+ "server unavailable",
+ OPERATION_RECEIVE_MESSAGE,
+ )
+ .set_source(e),
+ _ => ClientError::new(
ErrorKind::ClientInternal,
"send rpc receive_message failed",
OPERATION_RECEIVE_MESSAGE,
)
- .set_source(e)
+ .set_source(e),
})?
.into_inner();
@@ -640,6 +703,7 @@ mock! {
#[cfg(test)]
mod tests {
+ use mockall::predicate::always;
use slog::debug;
use wiremock_grpc::generate;
@@ -684,35 +748,6 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn session_start() {
- let mut server = RocketMQMockServer::start_default().await;
- server.setup(
- MockBuilder::when()
- // 👇 RPC prefix
- .path("/apache.rocketmq.v2.MessagingService/Telemetry")
- .then()
- .return_status(Code::Ok),
- );
-
- let logger = terminal_logger();
- let mut client_option = ClientOption::default();
- client_option.set_enable_tls(false);
- let session = Session::new(
- &logger,
- &Endpoints::from_url(&format!("localhost:{}",
server.address().port())).unwrap(),
- "test_client".to_string(),
- &client_option,
- )
- .await;
- debug!(logger, "session: {:?}", session);
- assert!(session.is_ok());
-
- let mut session = session.unwrap();
-
- let (tx, _) = mpsc::channel(16);
- let result = session
- .start(build_producer_settings(&ProducerOption::default()), tx)
- .await;
- assert!(result.is_ok());
- assert!(session.is_started());
+ // TODO: add grpc bi-stream test
}
}