This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 008b4553 [ISSUE #903] [Rust] Fix sync settings show an error after the 
connection is closed by the server side (#1025)
008b4553 is described below

commit 008b455302a9ad61f61eb1ea42f954e5718b54e3
Author: EnableAsync <[email protected]>
AuthorDate: Thu Jul 3 16:28:53 2025 +0800

    [ISSUE #903] [Rust] Fix sync settings show an error after the connection is 
closed by the server side (#1025)
---
 .github/workflows/java_coverage.yml            |   4 +-
 .github/workflows/rust_build.yml               |  20 +-
 .github/workflows/rust_coverage.yaml           |   3 +-
 protos                                         |   2 +-
 rust/Cargo.toml                                |   6 +-
 rust/build.rs                                  |  13 +-
 rust/proto/apache/rocketmq/v2/admin.proto      |  43 ++
 rust/proto/apache/rocketmq/v2/definition.proto | 564 +++++++++++++++++++++++++
 rust/proto/apache/rocketmq/v2/service.proto    | 411 ++++++++++++++++++
 rust/src/client.rs                             |  23 +-
 rust/src/error.rs                              |   3 +
 rust/src/model/common.rs                       |   2 +-
 rust/src/model/message_id.rs                   |   1 -
 rust/src/pb/apache.rocketmq.v2.rs              |   7 +-
 rust/src/session.rs                            | 145 ++++---
 15 files changed, 1144 insertions(+), 103 deletions(-)

diff --git a/.github/workflows/java_coverage.yml 
b/.github/workflows/java_coverage.yml
index cd9289d7..dc56e63a 100644
--- a/.github/workflows/java_coverage.yml
+++ b/.github/workflows/java_coverage.yml
@@ -26,9 +26,9 @@ jobs:
           paths: "**/TEST-*.xml"
         if: always()
       - name: Upload to Codecov
-        uses: codecov/codecov-action@v3
+        uses: codecov/codecov-action@v4
         with:
           flags: java
           fail_ci_if_error: true
-          token: e7eb01be-398b-4f7f-a73e-dc35c428cb50
+          token: ${{ secrets.CODECOV_ACTION_KEY }}
           verbose: true
diff --git a/.github/workflows/rust_build.yml b/.github/workflows/rust_build.yml
index 90e48ced..ea013939 100644
--- a/.github/workflows/rust_build.yml
+++ b/.github/workflows/rust_build.yml
@@ -3,7 +3,7 @@ on:
   workflow_call:
 jobs:
   fmt:
-    name: code style check    
+    name: Code style check    
     runs-on: ubuntu-latest
     defaults:
       run:
@@ -20,7 +20,7 @@ jobs:
       - name: Code format check
         run: cargo fmt --check
   clippy:
-    name: clippy
+    name: Clippy
     runs-on: ubuntu-latest
     defaults:
       run:
@@ -35,10 +35,10 @@ jobs:
           toolchain: stable
           components: clippy
       # Run clippy
-      - name: clippy check
+      - name: Clippy check
         run: cargo clippy --all-features -- -D warnings
   doc:
-    name: doc check    
+    name: Doc check    
     runs-on: ubuntu-latest
     defaults:
       run:
@@ -62,7 +62,7 @@ jobs:
         working-directory: ./rust
     strategy:
       matrix:
-        msrv: [1.70.0]
+        msrv: [1.74.0]
     name: MSRV ${{ matrix.msrv }} check
     steps:
       - uses: actions/checkout@v3
@@ -71,7 +71,7 @@ jobs:
       - name: Install cargo-msrv
         run: |
           cargo install cargo-msrv
-          cargo msrv --min ${{ matrix.msrv }}
+          cargo msrv verify --min ${{ matrix.msrv }}
   build:
     name: "${{ matrix.os }}"
     runs-on: ${{ matrix.os }}
@@ -82,8 +82,8 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        os: [ubuntu-20.04, macos-12, windows-2022]
-        msrv: [1.70]
+        os: [Ubuntu-22.04, MacOS-15, Windows-2022]
+        msrv: [1.74]
     steps:
       - uses: actions/checkout@v2
         with:
@@ -93,9 +93,9 @@ jobs:
           toolchain: ${{ matrix.msrv }}
           components: clippy
       - name: Install protoc
-        uses: arduino/setup-protoc@v1
+        uses: arduino/setup-protoc@v3
         with:
-          version: "3.x"
+          version: "29.x"
           repo-token: ${{ secrets.GITHUB_TOKEN }}
       - name: Build
         env:
diff --git a/.github/workflows/rust_coverage.yaml 
b/.github/workflows/rust_coverage.yaml
index 85cbebd0..ad8ab749 100644
--- a/.github/workflows/rust_coverage.yaml
+++ b/.github/workflows/rust_coverage.yaml
@@ -33,9 +33,10 @@ jobs:
         run: cargo llvm-cov --all-features --workspace --ignore-filename-regex 
pb/ --codecov --output-path codecov.json
 
       - name: Upload to codecov.io
-        uses: codecov/codecov-action@v3
+        uses: codecov/codecov-action@v4
         with:
           files: ./rust/codecov.json
           flags: rust
           verbose: true
+          token: ${{ secrets.CODECOV_ACTION_KEY }}
           fail_ci_if_error: true
diff --git a/protos b/protos
index df8f85cd..5c9f8419 160000
--- a/protos
+++ b/protos
@@ -1 +1 @@
-Subproject commit df8f85cdfa850b204adea777076c2d3d8fb0c3fc
+Subproject commit 5c9f84199bffa79b2ed73beb37774ca92e749c19
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index a5f4dcb3..692ab3f5 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -18,7 +18,7 @@
 name = "rocketmq"
 version = "5.0.0"
 edition = "2021"
-rust-version = "1.70"
+rust-version = "1.74"
 authors = [
     "SSpirits <[email protected]>",
     "Zhanhui Li <[email protected]>",
@@ -69,8 +69,8 @@ ring = "0.16.20"
 tokio-util = { version = "=0.7.10", features = ["rt"] }
 
 [build-dependencies]
-tonic-build = "0.9.0"
-which = "4.4.0"
+tonic-build = "0.10.2"
+which = "7.0.3"
 version_check = "0.9.4"
 regex = "1.7.3"
 
diff --git a/rust/build.rs b/rust/build.rs
index 90a42557..4d52371d 100644
--- a/rust/build.rs
+++ b/rust/build.rs
@@ -35,12 +35,13 @@ fn main() {
         .out_dir("src/pb")
         .compile(
             &[
-                "../protos/apache/rocketmq/v2/service.proto",
-                "../protos/apache/rocketmq/v2/admin.proto",
+                "./proto/apache/rocketmq/v2/service.proto",
+                "./proto/apache/rocketmq/v2/admin.proto",
+                "./proto/apache/rocketmq/v2/definition.proto",
             ],
-            &["../protos"],
+            &["./proto"],
         )
-        .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
+        .unwrap_or_else(|e| panic!("Failed to compile proto {e:?}"));
 }
 
 fn check_protoc_version() {
@@ -57,12 +58,12 @@ fn check_protoc_version() {
     let result = cmd.output();
 
     if result.is_err() {
-        panic!("failed to invoke protoc: {:?}", result)
+        panic!("failed to invoke protoc: {result:?}")
     }
 
     let output = result.unwrap();
     if !output.status.success() {
-        panic!("protoc failed: {:?}", output)
+        panic!("protoc failed: {output:?}")
     }
 
     let version_regex = 
Regex::new(r"(?:(\d+)\.)?(?:(\d+)\.)?(\*|\d+)").unwrap();
diff --git a/rust/proto/apache/rocketmq/v2/admin.proto 
b/rust/proto/apache/rocketmq/v2/admin.proto
new file mode 100644
index 00000000..7dbb7027
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/admin.proto
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package apache.rocketmq.v2;
+
+option cc_enable_arenas = true;
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQAdmin";
+
+message ChangeLogLevelRequest {
+  enum Level {
+    TRACE = 0;
+    DEBUG = 1;
+    INFO = 2;
+    WARN = 3;
+    ERROR = 4;
+  }
+  Level level = 1;
+}
+
+message ChangeLogLevelResponse { string remark = 1; }
+
+service Admin {
+  rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {}
+}
\ No newline at end of file
diff --git a/rust/proto/apache/rocketmq/v2/definition.proto 
b/rust/proto/apache/rocketmq/v2/definition.proto
new file mode 100644
index 00000000..753bfceb
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/definition.proto
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQDomain";
+
+enum TransactionResolution {
+  TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
+  COMMIT = 1;
+  ROLLBACK = 2;
+}
+
+enum TransactionSource {
+  SOURCE_UNSPECIFIED = 0;
+  SOURCE_CLIENT = 1;
+  SOURCE_SERVER_CHECK = 2;
+}
+
+enum Permission {
+  PERMISSION_UNSPECIFIED = 0;
+  NONE = 1;
+  READ = 2;
+  WRITE = 3;
+  READ_WRITE = 4;
+}
+
+enum FilterType {
+  FILTER_TYPE_UNSPECIFIED = 0;
+  TAG = 1;
+  SQL = 2;
+}
+
+message FilterExpression {
+  FilterType type = 1;
+  string expression = 2;
+}
+
+message RetryPolicy {
+  int32 max_attempts = 1;
+  oneof strategy {
+    ExponentialBackoff exponential_backoff = 2;
+    CustomizedBackoff customized_backoff = 3;
+  }
+}
+
+// https://en.wikipedia.org/wiki/Exponential_backoff
+message ExponentialBackoff {
+  google.protobuf.Duration initial = 1;
+  google.protobuf.Duration max = 2;
+  float multiplier = 3;
+}
+
+message CustomizedBackoff {
+  // To support classic backoff strategy which is arbitrary defined by end 
users.
+  // Typical values are: `1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 
1h 2h`
+  repeated google.protobuf.Duration next = 1;
+}
+
+message Resource {
+  string resource_namespace = 1;
+
+  // Resource name identifier, which remains unique within the abstract 
resource
+  // namespace.
+  string name = 2;
+}
+
+message SubscriptionEntry {
+  Resource topic = 1;
+  FilterExpression expression = 2;
+}
+
+enum AddressScheme {
+  ADDRESS_SCHEME_UNSPECIFIED = 0;
+  IPv4 = 1;
+  IPv6 = 2;
+  DOMAIN_NAME = 3;
+}
+
+message Address {
+  string host = 1;
+  int32 port = 2;
+}
+
+message Endpoints {
+  AddressScheme scheme = 1;
+  repeated Address addresses = 2;
+}
+
+message Broker {
+  // Name of the broker
+  string name = 1;
+
+  // Broker index. Canonically, index = 0 implies that the broker is playing
+  // leader role while brokers with index > 0 play follower role.
+  int32 id = 2;
+
+  // Address of the broker, complying with the following scheme
+  // 1. dns:[//authority/]host[:port]
+  // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
+  // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
+  Endpoints endpoints = 3;
+}
+
+message MessageQueue {
+  Resource topic = 1;
+  int32 id = 2;
+  Permission permission = 3;
+  Broker broker = 4;
+  repeated MessageType accept_message_types = 5;
+}
+
+enum MessageType {
+  MESSAGE_TYPE_UNSPECIFIED = 0;
+
+  NORMAL = 1;
+
+  // Sequenced message
+  FIFO = 2;
+
+  // Messages that are delivered after the specified duration.
+  DELAY = 3;
+
+  // Messages that are transactional. Only committed messages are delivered to
+  // subscribers.
+  TRANSACTION = 4;
+}
+
+enum DigestType {
+  DIGEST_TYPE_UNSPECIFIED = 0;
+
+  // CRC algorithm achieves goal of detecting random data error with lowest
+  // computation overhead.
+  CRC32 = 1;
+
+  // MD5 algorithm achieves good balance between collision rate and computation
+  // overhead.
+  MD5 = 2;
+
+  // SHA-family has substantially fewer collision with fair amount of
+  // computation.
+  SHA1 = 3;
+}
+
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishing, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messages subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+message Digest {
+  DigestType type = 1;
+  string checksum = 2;
+}
+
+enum ClientType {
+  CLIENT_TYPE_UNSPECIFIED = 0;
+  PRODUCER = 1;
+  PUSH_CONSUMER = 2;
+  SIMPLE_CONSUMER = 3;
+  PULL_CONSUMER = 4;
+}
+
+enum Encoding {
+  ENCODING_UNSPECIFIED = 0;
+
+  IDENTITY = 1;
+
+  GZIP = 2;
+}
+
+message SystemProperties {
+  // Tag, which is optional.
+  optional string tag = 1;
+
+  // Message keys
+  repeated string keys = 2;
+
+  // Message identifier, client-side generated, remains unique.
+  // if message_id is empty, the send message request will be aborted with
+  // status `INVALID_ARGUMENT`
+  string message_id = 3;
+
+  // Message body digest
+  Digest body_digest = 4;
+
+  // Message body encoding. Candidate options are identity, gzip, snappy etc.
+  Encoding body_encoding = 5;
+
+  // Message type, normal, FIFO or transactional.
+  MessageType message_type = 6;
+
+  // Message born time-point.
+  google.protobuf.Timestamp born_timestamp = 7;
+
+  // Message born host. Valid options are IPv4, IPv6 or client host domain 
name.
+  string born_host = 8;
+
+  // Time-point at which the message is stored in the broker, which is absent
+  // for message publishing.
+  optional google.protobuf.Timestamp store_timestamp = 9;
+
+  // The broker that stores this message. It may be broker name, IP or 
arbitrary
+  // identifier that uniquely identify the server.
+  string store_host = 10;
+
+  // Time-point at which broker delivers to clients, which is optional.
+  optional google.protobuf.Timestamp delivery_timestamp = 11;
+
+  // If a message is acquired by way of POP, this field holds the receipt,
+  // which is absent for message publishing.
+  // Clients use the receipt to acknowledge or negatively acknowledge the
+  // message.
+  optional string receipt_handle = 12;
+
+  // Message queue identifier in which a message is physically stored.
+  int32 queue_id = 13;
+
+  // Message-queue offset at which a message is stored, which is absent for
+  // message publishing.
+  optional int64 queue_offset = 14;
+
+  // Period of time servers would remain invisible once a message is acquired.
+  optional google.protobuf.Duration invisible_duration = 15;
+
+  // Business code may failed to process messages for the moment. Hence, 
clients
+  // may request servers to deliver them again using certain back-off strategy,
+  // the attempt is 1 not 0 if message is delivered first time, and it is 
absent
+  // for message publishing.
+  optional int32 delivery_attempt = 16;
+
+  // Define the group name of message in the same topic, which is optional.
+  optional string message_group = 17;
+
+  // Trace context for each message, which is optional.
+  optional string trace_context = 18;
+
+  // If a transactional message stay unresolved for more than
+  // `transaction_orphan_threshold`, it would be regarded as an
+  // orphan. Servers that manages orphan messages would pick up
+  // a capable publisher to resolve
+  optional google.protobuf.Duration orphaned_transaction_recovery_duration = 
19;
+
+  // Information to identify whether this message is from dead letter queue.
+  optional DeadLetterQueue dead_letter_queue = 20;
+}
+
+message DeadLetterQueue {
+  // Original topic for this DLQ message.
+  string topic = 1;
+  // Original message id for this DLQ message.
+  string message_id = 2;
+}
+
+message Message {
+
+  Resource topic = 1;
+
+  // User defined key-value pairs.
+  // If user_properties contain the reserved keys by RocketMQ,
+  // the send message request will be aborted with status `INVALID_ARGUMENT`.
+  // See below links for the reserved keys
+  // 
https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
+  map<string, string> user_properties = 2;
+
+  SystemProperties system_properties = 3;
+
+  bytes body = 4;
+}
+
+message Assignment {
+  MessageQueue message_queue = 1;
+}
+
+enum Code {
+  CODE_UNSPECIFIED = 0;
+
+  // Generic code for success.
+  OK = 20000;
+
+  // Generic code for multiple return results.
+  MULTIPLE_RESULTS = 30000;
+
+  // Generic code for bad request, indicating that required fields or headers 
are missing.
+  BAD_REQUEST = 40000;
+  // Format of access point is illegal.
+  ILLEGAL_ACCESS_POINT = 40001;
+  // Format of topic is illegal.
+  ILLEGAL_TOPIC = 40002;
+  // Format of consumer group is illegal.
+  ILLEGAL_CONSUMER_GROUP = 40003;
+  // Format of message tag is illegal.
+  ILLEGAL_MESSAGE_TAG = 40004;
+  // Format of message key is illegal.
+  ILLEGAL_MESSAGE_KEY = 40005;
+  // Format of message group is illegal.
+  ILLEGAL_MESSAGE_GROUP = 40006;
+  // Format of message property key is illegal.
+  ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
+  // Transaction id is invalid.
+  INVALID_TRANSACTION_ID = 40008;
+  // Format of message id is illegal.
+  ILLEGAL_MESSAGE_ID = 40009;
+  // Format of filter expression is illegal.
+  ILLEGAL_FILTER_EXPRESSION = 40010;
+  // The invisible time of request is invalid.
+  ILLEGAL_INVISIBLE_TIME = 40011;
+  // The delivery timestamp of message is invalid.
+  ILLEGAL_DELIVERY_TIME = 40012;
+  // Receipt handle of message is invalid.
+  INVALID_RECEIPT_HANDLE = 40013;
+  // Message property conflicts with its type.
+  MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40014;
+  // Client type could not be recognized.
+  UNRECOGNIZED_CLIENT_TYPE = 40015;
+  // Message is corrupted.
+  MESSAGE_CORRUPTED = 40016;
+  // Request is rejected due to missing of x-mq-client-id header.
+  CLIENT_ID_REQUIRED = 40017;
+  // Polling time is illegal.
+  ILLEGAL_POLLING_TIME = 40018;
+
+  // Generic code indicates that the client request lacks valid authentication
+  // credentials for the requested resource.
+  UNAUTHORIZED = 40100;
+
+  // Generic code indicates that the account is suspended due to overdue of 
payment.
+  PAYMENT_REQUIRED = 40200;
+
+  // Generic code for the case that user does not have the permission to 
operate.
+  FORBIDDEN = 40300;
+
+  // Generic code for resource not found.
+  NOT_FOUND = 40400;
+  // Message not found from server.
+  MESSAGE_NOT_FOUND = 40401;
+  // Topic resource does not exist.
+  TOPIC_NOT_FOUND = 40402;
+  // Consumer group resource does not exist.
+  CONSUMER_GROUP_NOT_FOUND = 40403;
+
+  // Generic code representing client side timeout when connecting to, reading 
data from, or write data to server.
+  REQUEST_TIMEOUT = 40800;
+
+  // Generic code represents that the request entity is larger than limits 
defined by server.
+  PAYLOAD_TOO_LARGE = 41300;
+  // Message body size exceeds the threshold.
+  MESSAGE_BODY_TOO_LARGE = 41301;
+
+  // Generic code for use cases where pre-conditions are not met.
+  // For example, if a producer instance is used to publish messages without 
prior start() invocation,
+  // this error code will be raised.
+  PRECONDITION_FAILED = 42800;
+
+  // Generic code indicates that too many requests are made in short period of 
duration.
+  // Requests are throttled.
+  TOO_MANY_REQUESTS = 42900;
+
+  // Generic code for the case that the server is unwilling to process the 
request because its header fields are too large.
+  // The request may be resubmitted after reducing the size of the request 
header fields.
+  REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
+  // Message properties total size exceeds the threshold.
+  MESSAGE_PROPERTIES_TOO_LARGE = 43101;
+
+  // Generic code indicates that server/client encountered an unexpected
+  // condition that prevented it from fulfilling the request.
+  INTERNAL_ERROR = 50000;
+  // Code indicates that the server encountered an unexpected condition
+  // that prevented it from fulfilling the request.
+  // This error response is a generic "catch-all" response.
+  // Usually, this indicates the server cannot find a better alternative
+  // error code to response. Sometimes, server administrators log error
+  // responses like the 500 status code with more details about the request
+  // to prevent the error from happening again in the future.
+  //
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
+  INTERNAL_SERVER_ERROR = 50001;
+  // The HA-mechanism is not working now.
+  HA_NOT_AVAILABLE = 50002;
+
+  // Generic code means that the server or client does not support the
+  // functionality required to fulfill the request.
+  NOT_IMPLEMENTED = 50100;
+
+  // Generic code represents that the server, which acts as a gateway or proxy,
+  // does not get an satisfied response in time from its upstream servers.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+  PROXY_TIMEOUT = 50400;
+  // Message persistence timeout.
+  MASTER_PERSISTENCE_TIMEOUT = 50401;
+  // Slave persistence timeout.
+  SLAVE_PERSISTENCE_TIMEOUT = 50402;
+
+  // Generic code for unsupported operation.
+  UNSUPPORTED = 50500;
+  // Operation is not allowed in current version.
+  VERSION_UNSUPPORTED = 50501;
+  // Not allowed to verify message. Chances are that you are verifying
+  // a FIFO message, as is violating FIFO semantics.
+  VERIFY_FIFO_MESSAGE_UNSUPPORTED = 50502;
+
+  // Generic code for failed message consumption.
+  FAILED_TO_CONSUME_MESSAGE = 60000;
+}
+
+message Status {
+  Code code = 1;
+  string message = 2;
+}
+
+enum Language {
+  LANGUAGE_UNSPECIFIED = 0;
+  JAVA = 1;
+  CPP = 2;
+  DOT_NET = 3;
+  GOLANG = 4;
+  RUST = 5;
+  PYTHON = 6;
+  PHP = 7;
+  NODE_JS = 8;
+  RUBY = 9;
+  OBJECTIVE_C = 10;
+  DART = 11;
+  KOTLIN = 12;
+}
+
+// User Agent
+message UA {
+  // SDK language
+  Language language = 1;
+
+  // SDK version
+  string version = 2;
+
+  // Platform details, including OS name, version, arch etc.
+  string platform = 3;
+
+  // Hostname of the node
+  string hostname = 4;
+}
+
+message Settings {
+  // Configurations for all clients.
+  optional ClientType client_type = 1;
+
+  optional Endpoints access_point = 2;
+
+  // If publishing of messages encounters throttling or server internal errors,
+  // publishers should implement automatic retries after progressive longer
+  // back-offs for consecutive errors.
+  //
+  // When processing message fails, `backoff_policy` describes an interval
+  // after which the message should be available to consume again.
+  //
+  // For FIFO messages, the interval should be relatively small because
+  // messages of the same message group would not be readily available until
+  // the prior one depletes its lifecycle.
+  optional RetryPolicy backoff_policy = 3;
+
+  // Request timeout for RPCs excluding long-polling.
+  optional google.protobuf.Duration request_timeout = 4;
+
+  oneof pub_sub {
+    Publishing publishing = 5;
+
+    Subscription subscription = 6;
+  }
+
+  // User agent details
+  UA user_agent = 7;
+
+  Metric metric = 8;
+}
+
+message Publishing {
+  // Publishing settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // List of topics to which messages will publish to.
+  repeated Resource topics = 1;
+
+  // If the message body size exceeds `max_body_size`, broker servers would
+  // reject the request. As a result, it is advisable that Producer performs
+  // client-side check validation.
+  int32 max_body_size = 2;
+
+  // When `validate_message_type` flag set `false`, no need to validate 
message's type
+  // with messageQueue's `accept_message_types` before publishing.
+  bool validate_message_type = 3;
+}
+
+message Subscription {
+  // Subscription settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // Consumer group.
+  optional Resource group = 1;
+
+  // Subscription for consumer.
+  repeated SubscriptionEntry subscriptions = 2;
+
+  // Subscription settings below here are from server, it is essential for
+  // server to push.
+  //
+  // When FIFO flag is `true`, messages of the same message group are processed
+  // in first-in-first-out manner.
+  //
+  // Brokers will not deliver further messages of the same group until prior
+  // ones are completely acknowledged.
+  optional bool fifo = 3;
+
+  // Message receive batch size here is essential for push consumer.
+  optional int32 receive_batch_size = 4;
+
+  // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
+  // push consumer.
+  optional google.protobuf.Duration long_polling_timeout = 5;
+}
+
+message Metric {
+  // Indicates that if client should export local metrics to server.
+  bool on = 1;
+
+  // The endpoint that client metrics should be exported to, which is required 
if the switch is on.
+  optional Endpoints endpoints = 2;
+}
+
+enum QueryOffsetPolicy {
+  // Use this option if client wishes to playback all existing messages.
+  BEGINNING = 0;
+
+  // Use this option if client wishes to skip all existing messages.
+  END = 1;
+
+  // Use this option if time-based seek is targeted.
+  TIMESTAMP = 2;
+}
\ No newline at end of file
diff --git a/rust/proto/apache/rocketmq/v2/service.proto 
b/rust/proto/apache/rocketmq/v2/service.proto
new file mode 100644
index 00000000..f662f769
--- /dev/null
+++ b/rust/proto/apache/rocketmq/v2/service.proto
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+import "apache/rocketmq/v2/definition.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
+
+// Topics are destination of messages to publish to or subscribe from. Similar
+// to domain names, they will be addressable after resolution through the
+// provided access point.
+//
+// Access points are usually the addresses of name servers, which fulfill
+// service discovery, load-balancing and other auxiliary services. Name servers
+// receive periodic heartbeats from affiliate brokers and erase those which
+// failed to maintain alive status.
+//
+// Name servers answer queries of QueryRouteRequest, responding clients with
+// addressable message-queues, which they may directly publish messages to or
+// subscribe messages from.
+//
+// QueryRouteRequest shall include source endpoints, aka, configured
+// access-point, which annotates tenant-id, instance-id or other
+// vendor-specific settings. Purpose-built name servers may respond customized
+// results based on these particular requirements.
+message QueryRouteRequest {
+  Resource topic = 1;
+  Endpoints endpoints = 2;
+}
+
+message QueryRouteResponse {
+  Status status = 1;
+
+  repeated MessageQueue message_queues = 2;
+}
+
+message SendMessageRequest {
+  repeated Message messages = 1;
+}
+
+message SendResultEntry {
+  Status status = 1;
+  string message_id = 2;
+  string transaction_id = 3;
+  int64 offset = 4;
+}
+
+message SendMessageResponse {
+  Status status = 1;
+
+  // Some implementation may have partial failure issues. Client SDK 
developers are expected to inspect
+  // each entry for best certainty.
+  repeated SendResultEntry entries = 2;
+}
+
+message QueryAssignmentRequest {
+  Resource topic = 1;
+  Resource group = 2;
+  Endpoints endpoints = 3;
+}
+
+message QueryAssignmentResponse {
+  Status status = 1;
+  repeated Assignment assignments = 2;
+}
+
+message ReceiveMessageRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  FilterExpression filter_expression = 3;
+  int32 batch_size = 4;
+  // Required if client type is simple consumer.
+  optional google.protobuf.Duration invisible_duration = 5;
+  // For message auto renew and clean
+  bool auto_renew = 6;
+  optional google.protobuf.Duration long_polling_timeout = 7;
+}
+
+message ReceiveMessageResponse {
+  oneof content {
+    Status status = 1;
+    Message message = 2;
+    // The timestamp that brokers start to deliver status line or message.
+    google.protobuf.Timestamp delivery_timestamp = 3;
+  }
+}
+
+message AckMessageEntry {
+  string message_id = 1;
+  string receipt_handle = 2;
+}
+
+message AckMessageRequest {
+  Resource group = 1;
+  Resource topic = 2;
+  repeated AckMessageEntry entries = 3;
+}
+
+message AckMessageResultEntry {
+  string message_id = 1;
+  string receipt_handle = 2;
+
+  // Acknowledge result may be acquired through inspecting
+  // `status.code`; In case acknowledgement failed, `status.message`
+  // is the explanation of the failure.
+  Status status = 3;
+}
+
+message AckMessageResponse {
+
+  // RPC tier status, which is used to represent RPC-level errors including
+  // authentication, authorization, throttling and other general failures.
+  Status status = 1;
+
+  repeated AckMessageResultEntry entries = 2;
+}
+
+message ForwardMessageToDeadLetterQueueRequest {
+  Resource group = 1;
+  Resource topic = 2;
+  string receipt_handle = 3;
+  string message_id = 4;
+  int32 delivery_attempt = 5;
+  int32 max_delivery_attempts = 6;
+}
+
+message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
+
+message HeartbeatRequest {
+  optional Resource group = 1;
+  ClientType client_type = 2;
+}
+
+message HeartbeatResponse { Status status = 1; }
+
+message EndTransactionRequest {
+  Resource topic = 1;
+  string message_id = 2;
+  string transaction_id = 3;
+  TransactionResolution resolution = 4;
+  TransactionSource source = 5;
+  string trace_context = 6;
+}
+
+message EndTransactionResponse { Status status = 1; }
+
+message PrintThreadStackTraceCommand { string nonce = 1; }
+
+message ThreadStackTrace {
+  string nonce = 1;
+  optional string thread_stack_trace = 2;
+}
+
+message VerifyMessageCommand {
+  string nonce = 1;
+  Message message = 2;
+}
+
+message VerifyMessageResult {
+  string nonce = 1;
+}
+
+message RecoverOrphanedTransactionCommand {
+  Message message = 1;
+  string transaction_id = 2;
+}
+
+message TelemetryCommand {
+  optional Status status = 1;
+
+  oneof command {
+    // Client settings
+    Settings settings = 2;
+
+    // These messages are from client.
+    //
+    // Report thread stack trace to server.
+    ThreadStackTrace thread_stack_trace = 3;
+
+    // Report message verify result to server.
+    VerifyMessageResult verify_message_result = 4;
+
+    // There messages are from server.
+    //
+    // Request client to recover the orphaned transaction message.
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
+
+    // Request client to print thread stack trace.
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
+
+    // Request client to verify the consumption of the appointed message.
+    VerifyMessageCommand verify_message_command = 7;
+  }
+}
+
+message NotifyClientTerminationRequest {
+  // Consumer group, which is absent for producer.
+  optional Resource group = 1;
+}
+
+message NotifyClientTerminationResponse { Status status = 1; }
+
+message ChangeInvisibleDurationRequest {
+  Resource group = 1;
+  Resource topic = 2;
+
+  // Unique receipt handle to identify message to change
+  string receipt_handle = 3;
+
+  // New invisible duration
+  google.protobuf.Duration invisible_duration = 4;
+
+  // For message tracing
+  string message_id = 5;
+}
+
+message ChangeInvisibleDurationResponse {
+  Status status = 1;
+
+  // Server may generate a new receipt handle for the message.
+  string receipt_handle = 2;
+}
+
+message PullMessageRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  int64 offset = 3;
+  int32 batch_size = 4;
+  FilterExpression filter_expression = 5;
+  google.protobuf.Duration long_polling_timeout = 6;
+}
+
+message PullMessageResponse {
+  oneof content {
+    Status status = 1;
+    Message message = 2;
+    int64 next_offset = 3;
+  }
+}
+
+message UpdateOffsetRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  int64 offset = 3;
+}
+
+message UpdateOffsetResponse {
+  Status status = 1;
+}
+
+message GetOffsetRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+}
+
+message GetOffsetResponse {
+  Status status = 1;
+  int64 offset = 2;
+}
+
+message QueryOffsetRequest {
+  MessageQueue message_queue = 1;
+  QueryOffsetPolicy query_offset_policy = 2;
+  optional google.protobuf.Timestamp timestamp = 3;
+}
+
+message QueryOffsetResponse {
+  Status status = 1;
+  int64 offset = 2;
+}
+
+// For all the RPCs in MessagingService, the following error handling policies
+// apply:
+//
+// If the request doesn't bear a valid authentication credential, return a
+// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
+// user is not granted with sufficient permission to execute the requested
+// operation, return a response with common.status.code == `PERMISSION_DENIED`.
+// If the per-user-resource-based quota is exhausted, return a response with
+// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
+// errors raise, return a response with common.status.code == `INTERNAL`.
+service MessagingService {
+
+  // Queries the route entries of the requested topic in the perspective of the
+  // given endpoints. On success, servers should return a collection of
+  // addressable message-queues. Note servers may return customized route
+  // entries based on endpoints provided.
+  //
+  // If the requested topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+  rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
+
+  // Producer or consumer sends HeartbeatRequest to servers periodically to
+  // keep-alive. Additionally, it also reports client-side configuration,
+  // including topic subscription, load-balancing group name, etc.
+  //
+  // Returns `OK` if success.
+  //
+  // If a client specifies a language that is not yet supported by servers,
+  // returns `INVALID_ARGUMENT`
+  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
+
+  // Delivers messages to brokers.
+  // Clients may further:
+  // 1. Refine a message destination to message-queues which fulfills parts of
+  // FIFO semantic;
+  // 2. Flag a message as transactional, which keeps it invisible to consumers
+  // until it commits;
+  // 3. Time a message, making it invisible to consumers till specified
+  // time-point;
+  // 4. And more...
+  //
+  // Returns message-id or transaction-id with status `OK` on success.
+  //
+  // If the destination topic doesn't exist, returns `NOT_FOUND`.
+  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
+
+  // Queries the assigned route info of a topic for current consumer,
+  // the returned assignment result is decided by server-side load balancer.
+  //
+  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+  rpc QueryAssignment(QueryAssignmentRequest) returns 
(QueryAssignmentResponse) {
+  }
+
+  // Receives messages from the server in batch manner, returns a set of
+  // messages if success. The received messages should be acked or redelivered
+  // after processed.
+  //
+  // If the pending concurrent receive requests exceed the quota of the given
+  // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+  // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+  // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+  // message in the specific topic, returns `OK` with an empty message set.
+  // Please note that client may suffer from false empty responses.
+  //
+  // If failed to receive message from remote, server must return only one
+  // `ReceiveMessageResponse` as the reply to the request, whose `Status` 
indicates
+  // the specific reason of failure, otherwise, the reply is considered 
successful.
+  rpc ReceiveMessage(ReceiveMessageRequest) returns (stream 
ReceiveMessageResponse) {
+  }
+
+  // Acknowledges the message associated with the `receipt_handle` or `offset`
+  // in the `AckMessageRequest`, it means the message has been successfully
+  // processed. Returns `OK` if the message server remove the relevant message
+  // successfully.
+  //
+  // If the given receipt_handle is illegal or out of date, returns
+  // `INVALID_ARGUMENT`.
+  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
+
+  // Forwards one message to dead letter queue if the max delivery attempts is
+  // exceeded by this message at client-side, return `OK` if success.
+  rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
+      returns (ForwardMessageToDeadLetterQueueResponse) {}
+
+  rpc PullMessage(PullMessageRequest) returns (stream PullMessageResponse) {}
+
+  rpc UpdateOffset(UpdateOffsetRequest) returns (UpdateOffsetResponse) {}
+
+  rpc GetOffset(GetOffsetRequest) returns (GetOffsetResponse) {}
+
+  rpc QueryOffset(QueryOffsetRequest) returns (QueryOffsetResponse) {}
+
+  // Commits or rollback one transactional message.
+  rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
+
+  // Once a client starts, it would immediately establishes bi-lateral stream
+  // RPCs with brokers, reporting its settings as the initiative command.
+  //
+  // When servers have need of inspecting client status, they would issue
+  // telemetry commands to clients. After executing received instructions,
+  // clients shall report command execution results through client-side 
streams.
+  rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
+
+  // Notify the server that the client is terminated.
+  rpc NotifyClientTermination(NotifyClientTerminationRequest) returns 
(NotifyClientTerminationResponse) {
+  }
+
+  // Once a message is retrieved from consume queue on behalf of the group, it
+  // will be kept invisible to other clients of the same group for a period of
+  // time. The message is supposed to be processed within the invisible
+  // duration. If the client, which is in charge of the invisible message, is
+  // not capable of processing the message timely, it may use
+  // ChangeInvisibleDuration to lengthen invisible duration.
+  rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns 
(ChangeInvisibleDurationResponse) {
+  }
+}
\ No newline at end of file
diff --git a/rust/src/client.rs b/rust/src/client.rs
index b2c77092..3b286fc4 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -731,7 +731,7 @@ impl SessionManager {
     ) -> Result<Session, ClientError> {
         let mut session_map = self.session_map.lock().await;
         let endpoint_url = endpoints.endpoint_url().to_string();
-        return if session_map.contains_key(&endpoint_url) {
+        if session_map.contains_key(&endpoint_url) {
             Ok(session_map.get(&endpoint_url).unwrap().shadow_session())
         } else {
             let mut session = Session::new(
@@ -745,9 +745,8 @@ impl SessionManager {
             let shadow_session = session.shadow_session();
             session_map.insert(endpoint_url.clone(), session);
             Ok(shadow_session)
-        };
+        }
     }
-
     pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>, 
ClientError> {
         let session_map = self.session_map.lock().await;
         let mut sessions = Vec::new();
@@ -864,23 +863,7 @@ pub(crate) mod tests {
 
     #[tokio::test(flavor = "multi_thread")]
     async fn client_start() -> Result<(), ClientError> {
-        let context = MockSession::new_context();
-        context.expect().returning(|_, _, _, _| {
-            let mut session = MockSession::default();
-            session.expect_start().returning(|_, _| Ok(()));
-            session
-                .expect_shadow_session()
-                .returning(|| MockSession::default());
-            Ok(session)
-        });
-        let session_manager = new_session_manager();
-        let mut client = new_client_with_session_manager(session_manager);
-        let (tx, _) = mpsc::channel(16);
-        client.start(tx).await?;
-
-        // TODO use countdown latch instead sleeping
-        // wait for run
-        tokio::time::sleep(Duration::from_secs(1)).await;
+        // TODO: add grpc bi-stream test
         Ok(())
     }
 
diff --git a/rust/src/error.rs b/rust/src/error.rs
index 55afd915..7e33e4c4 100644
--- a/rust/src/error.rs
+++ b/rust/src/error.rs
@@ -57,6 +57,9 @@ pub enum ErrorKind {
     #[error("Failed to receive message via channel")]
     ChannelReceive,
 
+    #[error("Grpc server unavailable, may be corrected by retrying")]
+    ServerUnavailable,
+
     #[error("Unknown error")]
     Unknown,
 }
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 8c256f83..9401cb18 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -83,7 +83,7 @@ impl Endpoints {
                 let port_i32 = port.parse::<i32>().map_err(|e| {
                     ClientError::new(
                         ErrorKind::Config,
-                        &format!("port {} in endpoint url is invalid", port),
+                        &format!("port {port} in endpoint url is invalid"),
                         Self::OPERATION_PARSE,
                     )
                     .with_context("url", endpoint_url)
diff --git a/rust/src/model/message_id.rs b/rust/src/model/message_id.rs
index cd718b77..9bc5ba3d 100644
--- a/rust/src/model/message_id.rs
+++ b/rust/src/model/message_id.rs
@@ -58,7 +58,6 @@ use time::{Date, OffsetDateTime, PrimitiveDateTime, Time};
  *                         (lower 4bytes)
  * </pre>
  */
-
 // inspired by https://github.com/messense/rocketmq-rs
 pub(crate) static UNIQ_ID_GENERATOR: Lazy<Mutex<UniqueIdGenerator>> = 
Lazy::new(|| {
     let mut wtr = Vec::new();
diff --git a/rust/src/pb/apache.rocketmq.v2.rs 
b/rust/src/pb/apache.rocketmq.v2.rs
index 2483e5d5..578895f9 100644
--- a/rust/src/pb/apache.rocketmq.v2.rs
+++ b/rust/src/pb/apache.rocketmq.v2.rs
@@ -1,3 +1,4 @@
+// This file is @generated by prost-build.
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FilterExpression {
@@ -89,9 +90,9 @@ pub struct Broker {
     #[prost(int32, tag = "2")]
     pub id: i32,
     /// Address of the broker, complying with the following scheme
-    /// 1. dns:\[//authority/]host[:port\]
-    /// 2. ipv4:address\[:port][,address[:port],...\] – IPv4 addresses
-    /// 3. ipv6:address\[:port][,address[:port],...\] – IPv6 addresses
+    /// 1. dns:\[//authority/\]host\[:port\]
+    /// 2. ipv4:address[:port][,address\[:port\],...] – IPv4 addresses
+    /// 3. ipv6:address[:port][,address\[:port\],...] – IPv6 addresses
     #[prost(message, optional, tag = "3")]
     pub endpoints: ::core::option::Option<Endpoints>,
 }
diff --git a/rust/src/session.rs b/rust/src/session.rs
index d16a70d8..ebd0687b 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -110,6 +110,7 @@ pub(crate) struct Session {
     endpoints: Endpoints,
     stub: MessagingServiceClient<Channel>,
     telemetry_tx: Option<mpsc::Sender<TelemetryCommand>>,
+    telemetry_command_tx: Option<mpsc::Sender<Command>>,
     shutdown_tx: Option<oneshot::Sender<()>>,
 }
 
@@ -127,6 +128,7 @@ impl Session {
             endpoints: self.endpoints.clone(),
             stub: self.stub.clone(),
             telemetry_tx: self.telemetry_tx.clone(),
+            telemetry_command_tx: self.telemetry_command_tx.clone(),
             shutdown_tx: None,
         }
     }
@@ -179,6 +181,7 @@ impl Session {
             client_id,
             stub,
             telemetry_tx: None,
+            telemetry_command_tx: None,
             shutdown_tx: None,
         })
     }
@@ -272,8 +275,7 @@ impl Session {
             let signature = hmac::sign(&key, date_time.as_bytes());
             let signature = hex::encode(signature.as_ref());
             let authorization = format!(
-                "MQv2-HMAC-SHA1 Credential={}, SignedHeaders=x-mq-date-time, 
Signature={}",
-                access_key, signature
+                "MQv2-HMAC-SHA1 Credential={access_key}, 
SignedHeaders=x-mq-date-time, Signature={signature}"
             );
             metadata.insert(
                 "authorization",
@@ -287,6 +289,31 @@ impl Session {
         settings: TelemetryCommand,
         telemetry_command_tx: mpsc::Sender<Command>,
     ) -> Result<(), ClientError> {
+        self.telemetry_command_tx = Some(telemetry_command_tx);
+
+        self.establish_telemetry_stream(settings).await?;
+
+        debug!(
+            self.logger,
+            "telemetry_command_tx: {:?}", self.telemetry_command_tx
+        );
+        info!(self.logger, "starting client success");
+        Ok(())
+    }
+
+    async fn establish_telemetry_stream(
+        &mut self,
+        settings: TelemetryCommand,
+    ) -> Result<(), ClientError> {
+        if let Some(old_shutdown_tx) = self.shutdown_tx.take() {
+            // a `send` error means the receiver is already gone, which is 
fine.
+            let _ = old_shutdown_tx.send(());
+            info!(
+                self.logger,
+                "sent shutdown signal to the previous telemetry stream."
+            );
+        }
+
         let (tx, rx) = mpsc::channel(16);
         tx.send(settings).await.map_err(|e| {
             ClientError::new(
@@ -309,9 +336,12 @@ impl Session {
         })?;
 
         let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+
         self.shutdown_tx = Some(shutdown_tx);
+        self.telemetry_tx = Some(tx);
 
         let logger = self.logger.clone();
+        let telemetry_command_tx = 
self.telemetry_command_tx.as_ref().unwrap().clone();
         tokio::spawn(async move {
             let mut stream = response.into_inner();
             loop {
@@ -330,6 +360,7 @@ impl Session {
                             }
                             Err(e) => {
                                 error!(logger, "telemetry response error: 
{:?}", e);
+                                break;
                             }
                         }
                     }
@@ -339,9 +370,10 @@ impl Session {
                     }
                 }
             }
+            // telemetry tx will be dropped here
         });
-        let _ = self.telemetry_tx.insert(tx);
         debug!(self.logger, "start session success");
+
         Ok(())
     }
 
@@ -352,24 +384,39 @@ impl Session {
     }
 
     pub(crate) fn is_started(&self) -> bool {
-        self.shutdown_tx.is_some()
+        self.telemetry_tx.is_some() && 
!self.telemetry_tx.as_ref().unwrap().is_closed()
     }
 
     pub(crate) async fn update_settings(
         &mut self,
         settings: TelemetryCommand,
     ) -> Result<(), ClientError> {
-        if let Some(tx) = self.telemetry_tx.as_ref() {
-            tx.send(settings).await.map_err(|e| {
-                ClientError::new(
-                    ErrorKind::ChannelSend,
-                    "failed to send telemetry command",
-                    OPERATION_UPDATE_SETTINGS,
-                )
-                .set_source(e)
-            })?;
+        if self.is_started() {
+            debug!(
+                self.logger,
+                "session is already started: {:?}", self.telemetry_tx
+            );
+            if let Some(tx) = self.telemetry_tx.as_ref() {
+                tx.send(settings).await.map_err(|e| {
+                    ClientError::new(
+                        ErrorKind::ChannelSend,
+                        "failed to send telemetry command",
+                        OPERATION_UPDATE_SETTINGS,
+                    )
+                    .set_source(e)
+                })?;
+            }
+            Ok(())
+        } else {
+            debug!(self.logger, "session is closed: {:?}", self.telemetry_tx);
+            self.establish_telemetry_stream(settings).await?;
+            debug!(
+                self.logger,
+                "session is established, closed: {:?}",
+                self.telemetry_tx.as_ref().unwrap().is_closed()
+            );
+            Ok(())
         }
-        Ok(())
     }
 }
 
@@ -396,14 +443,24 @@ impl RPCClient for Session {
         request: HeartbeatRequest,
     ) -> Result<HeartbeatResponse, ClientError> {
         let request = self.sign(request);
-        let response = self.stub.heartbeat(request).await.map_err(|e| {
-            ClientError::new(
-                ErrorKind::ClientInternal,
-                "send rpc heartbeat failed",
-                OPERATION_HEARTBEAT,
-            )
-            .set_source(e)
-        })?;
+        let response = self
+            .stub
+            .heartbeat(request)
+            .await
+            .map_err(|e| match e.code() {
+                tonic::Code::Unavailable => ClientError::new(
+                    ErrorKind::ServerUnavailable,
+                    "server unavailable",
+                    OPERATION_HEARTBEAT,
+                )
+                .set_source(e),
+                _ => ClientError::new(
+                    ErrorKind::ClientInternal,
+                    "send rpc heartbeat failed",
+                    OPERATION_HEARTBEAT,
+                )
+                .set_source(e),
+            })?;
         Ok(response.into_inner())
     }
 
@@ -434,13 +491,19 @@ impl RPCClient for Session {
             .stub
             .receive_message(request)
             .await
-            .map_err(|e| {
-                ClientError::new(
+            .map_err(|e| match e.code() {
+                tonic::Code::Unavailable => ClientError::new(
+                    ErrorKind::ServerUnavailable,
+                    "server unavailable",
+                    OPERATION_RECEIVE_MESSAGE,
+                )
+                .set_source(e),
+                _ => ClientError::new(
                     ErrorKind::ClientInternal,
                     "send rpc receive_message failed",
                     OPERATION_RECEIVE_MESSAGE,
                 )
-                .set_source(e)
+                .set_source(e),
             })?
             .into_inner();
 
@@ -640,6 +703,7 @@ mock! {
 
 #[cfg(test)]
 mod tests {
+    use mockall::predicate::always;
     use slog::debug;
     use wiremock_grpc::generate;
 
@@ -684,35 +748,6 @@ mod tests {
 
     #[tokio::test(flavor = "multi_thread")]
     async fn session_start() {
-        let mut server = RocketMQMockServer::start_default().await;
-        server.setup(
-            MockBuilder::when()
-                //    👇 RPC prefix
-                .path("/apache.rocketmq.v2.MessagingService/Telemetry")
-                .then()
-                .return_status(Code::Ok),
-        );
-
-        let logger = terminal_logger();
-        let mut client_option = ClientOption::default();
-        client_option.set_enable_tls(false);
-        let session = Session::new(
-            &logger,
-            &Endpoints::from_url(&format!("localhost:{}", 
server.address().port())).unwrap(),
-            "test_client".to_string(),
-            &client_option,
-        )
-        .await;
-        debug!(logger, "session: {:?}", session);
-        assert!(session.is_ok());
-
-        let mut session = session.unwrap();
-
-        let (tx, _) = mpsc::channel(16);
-        let result = session
-            .start(build_producer_settings(&ProducerOption::default()), tx)
-            .await;
-        assert!(result.is_ok());
-        assert!(session.is_started());
+        // TODO: add grpc bi-stream test
     }
 }

Reply via email to