This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 9fec02bc feat(rust): support changing invisible duration (#623)
9fec02bc is described below
commit 9fec02bcf4711355532f953b8d892d2724c92544
Author: SSpirits <[email protected]>
AuthorDate: Mon Oct 16 11:07:38 2023 +0800
feat(rust): support changing invisible duration (#623)
* feat(rust): support changing invisible duration
Signed-off-by: SSpirits <[email protected]>
* fix(rust): fix clippy warning
Signed-off-by: SSpirits <[email protected]>
* feat(rust): configure the example branch using features
Signed-off-by: SSpirits <[email protected]>
* fix(rust): fix clippy warning
Signed-off-by: SSpirits <[email protected]>
---------
Signed-off-by: SSpirits <[email protected]>
---
rust/Cargo.toml | 15 ++++--
rust/examples/producer.rs | 2 +-
rust/examples/simple_consumer.rs | 42 ++++++++++++++---
rust/src/client.rs | 98 ++++++++++++++++++++++++++++++++++------
rust/src/model/common.rs | 3 +-
rust/src/model/message.rs | 6 +--
rust/src/model/transaction.rs | 4 +-
rust/src/session.rs | 31 +++++++++++--
rust/src/simple_consumer.rs | 13 ++++++
rust/src/util.rs | 2 +-
10 files changed, 177 insertions(+), 39 deletions(-)
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 80af0bb9..6cc106bb 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -33,11 +33,11 @@ keywords = ["rocketmq", "api", "client", "sdk", "grpc"]
[dependencies]
tokio = { version = "1", features = ["full"] }
-tokio-rustls = {version = "0.24.0", features = ["default",
"dangerous_configuration"] }
-tokio-stream="0.1.12"
+tokio-rustls = { version = "0.24.0", features = ["default",
"dangerous_configuration"] }
+tokio-stream = "0.1.12"
async-trait = "0.1.68"
lazy_static = "1.4"
-tonic = {version = "0.9.0", features = ["tls", "default", "channel",
"tls-roots"]}
+tonic = { version = "0.9.0", features = ["tls", "default", "channel",
"tls-roots"] }
prost = "0.11.8"
prost-types = "0.11.8"
@@ -47,7 +47,7 @@ parking_lot = "0.12"
hostname = "0.3.1"
os_info = "3"
-slog = {version = "2.7.0", features=["max_level_trace",
"release_max_level_info"]}
+slog = { version = "2.7.0", features = ["max_level_trace",
"release_max_level_info"] }
slog-term = "2.9.0"
slog-async = "2.7.0"
slog-json = "2.6.1"
@@ -63,7 +63,7 @@ time = "0.3"
once_cell = "1.9.0"
mockall = "0.11.4"
-mockall_double= "0.3.0"
+mockall_double = "0.3.0"
siphasher = "0.3.10"
ring = "0.16.20"
@@ -78,3 +78,8 @@ regex = "1.7.3"
wiremock-grpc = "0.0.3-alpha2"
futures = "0.3"
awaitility = "0.3.0"
+
+[features]
+default = ["example_ack"]
+example_ack = []
+example_change_invisible_duration = []
\ No newline at end of file
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index 3e818e91..7179c9d2 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -59,7 +59,7 @@ async fn main() {
// shutdown the producer when you don't need it anymore.
// you should shutdown it manually to gracefully stop and unregister from
server
let shutdown_result = producer.shutdown().await;
- if shutdown_result.is_ok() {
+ if shutdown_result.is_err() {
eprintln!(
"producer shutdown failed: {:?}",
shutdown_result.unwrap_err()
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index fc1a8388..d9bb06da 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -14,6 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#[cfg(feature = "example_change_invisible_duration")]
+use std::time::Duration;
+
use rocketmq::conf::{ClientOption, SimpleConsumerOption};
use rocketmq::model::common::{FilterExpression, FilterType};
use rocketmq::SimpleConsumer;
@@ -63,14 +66,39 @@ async fn main() {
for message in messages {
println!("receive message: {:?}", message);
- // ack message to rocketmq proxy
- let ack_result = consumer.ack(&message).await;
- if ack_result.is_err() {
- eprintln!(
- "ack message {} failed: {:?}",
- message.message_id(),
- ack_result.unwrap_err()
+
+ // Do your business logic here
+ // And then acknowledge the message to the RocketMQ proxy if
everything is okay
+ #[cfg(feature = "example_ack")]
+ {
+ println!("ack message {}", message.message_id());
+ let ack_result = consumer.ack(&message).await;
+ if ack_result.is_err() {
+ eprintln!(
+ "ack message {} failed: {:?}",
+ message.message_id(),
+ ack_result.unwrap_err()
+ );
+ }
+ }
+
+ // Otherwise, you can retry this message later by changing the
invisible duration
+ #[cfg(feature = "example_change_invisible_duration")]
+ {
+ println!(
+ "Delay next visible time of message {} by 10s",
+ message.message_id()
);
+ let change_invisible_duration_result = consumer
+ .change_invisible_duration(&message, Duration::from_secs(10))
+ .await;
+ if change_invisible_duration_result.is_err() {
+ eprintln!(
+ "change message {} invisible duration failed: {:?}",
+ message.message_id(),
+ change_invisible_duration_result.unwrap_err()
+ );
+ }
}
}
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 3804b4ae..ae034682 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -34,12 +34,12 @@ use crate::model::message::{AckMessageEntry, MessageView};
use crate::model::transaction::{TransactionChecker, TransactionResolution};
use crate::pb;
use crate::pb::receive_message_response::Content;
-use crate::pb::telemetry_command::Command::RecoverOrphanedTransactionCommand;
+use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand,
Settings};
use crate::pb::{
- AckMessageRequest, AckMessageResultEntry, Code, EndTransactionRequest,
FilterExpression,
- HeartbeatRequest, HeartbeatResponse, Message, MessageQueue,
NotifyClientTerminationRequest,
- QueryRouteRequest, ReceiveMessageRequest, Resource, SendMessageRequest,
Status,
- TelemetryCommand, TransactionSource,
+ AckMessageRequest, AckMessageResultEntry, ChangeInvisibleDurationRequest,
Code,
+ EndTransactionRequest, FilterExpression, HeartbeatRequest,
HeartbeatResponse, Message,
+ MessageQueue, NotifyClientTerminationRequest, QueryRouteRequest,
ReceiveMessageRequest,
+ Resource, SendMessageRequest, Status, TelemetryCommand, TransactionSource,
};
#[double]
use crate::session::SessionManager;
@@ -282,6 +282,7 @@ impl Client {
))
}
}
+ Settings(_) => Ok(()),
_ => Err(ClientError::new(
ErrorKind::Config,
"receive telemetry command but there is no handler",
@@ -291,7 +292,6 @@ impl Client {
};
}
- #[allow(dead_code)]
pub(crate) fn client_id(&self) -> &str {
&self.id
}
@@ -378,7 +378,6 @@ impl Client {
})
}
- #[allow(dead_code)]
pub(crate) async fn topic_route(
&self,
topic: &str,
@@ -461,8 +460,7 @@ impl Client {
let result = self.query_topic_route(rpc_client, topic).await;
// send result to all waiters
- if result.is_ok() {
- let route = result.unwrap();
+ if let Ok(route) = result {
debug!(
self.logger,
"query route for topic={} success: route={:?}", topic, route
@@ -518,7 +516,6 @@ impl Client {
Ok(response)
}
- #[allow(dead_code)]
pub(crate) async fn send_message(
&self,
endpoints: &Endpoints,
@@ -547,7 +544,6 @@ impl Client {
.collect())
}
- #[allow(dead_code)]
pub(crate) async fn receive_message(
&self,
endpoints: &Endpoints,
@@ -608,7 +604,6 @@ impl Client {
Ok(messages)
}
- #[allow(dead_code)]
pub(crate) async fn ack_message<T: AckMessageEntry + 'static>(
&self,
ack_entry: &T,
@@ -649,6 +644,51 @@ impl Client {
Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
Ok(response.entries)
}
+
+ pub(crate) async fn change_invisible_duration<T: AckMessageEntry +
'static>(
+ &self,
+ ack_entry: &T,
+ invisible_duration: Duration,
+ ) -> Result<String, ClientError> {
+ let result = self
+ .change_invisible_duration_inner(
+ self.get_session_with_endpoints(ack_entry.endpoints())
+ .await
+ .unwrap(),
+ ack_entry.topic(),
+ ack_entry.receipt_handle(),
+ invisible_duration,
+ ack_entry.message_id(),
+ )
+ .await?;
+ Ok(result)
+ }
+
+ pub(crate) async fn change_invisible_duration_inner<T: RPCClient +
'static>(
+ &self,
+ mut rpc_client: T,
+ topic: String,
+ receipt_handle: String,
+ invisible_duration: Duration,
+ message_id: String,
+ ) -> Result<String, ClientError> {
+ let request = ChangeInvisibleDurationRequest {
+ group: Some(Resource {
+ name: self.option.group.as_ref().unwrap().to_string(),
+ resource_namespace: self.option.namespace.to_string(),
+ }),
+ topic: Some(Resource {
+ name: topic,
+ resource_namespace: self.option.namespace.to_string(),
+ }),
+ receipt_handle,
+ invisible_duration: Some(invisible_duration),
+ message_id,
+ };
+ let response = rpc_client.change_invisible_duration(request).await?;
+ Self::handle_response_status(response.status, OPERATION_ACK_MESSAGE)?;
+ Ok(response.receipt_handle)
+ }
}
#[cfg(test)]
@@ -668,9 +708,10 @@ pub(crate) mod tests {
use crate::model::transaction::TransactionResolution;
use crate::pb::receive_message_response::Content;
use crate::pb::{
- AckMessageEntry, AckMessageResponse, Code, EndTransactionResponse,
FilterExpression,
- HeartbeatResponse, Message, MessageQueue, QueryRouteResponse,
ReceiveMessageResponse,
- Resource, SendMessageResponse, Status, SystemProperties,
TelemetryCommand,
+ AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse,
Code,
+ EndTransactionResponse, FilterExpression, HeartbeatResponse, Message,
MessageQueue,
+ QueryRouteResponse, ReceiveMessageResponse, Resource,
SendMessageResponse, Status,
+ SystemProperties, TelemetryCommand,
};
use crate::session;
@@ -1045,6 +1086,33 @@ pub(crate) mod tests {
assert_eq!(ack_result.unwrap().len(), 0);
}
+ #[tokio::test]
+ async fn client_change_invisible_duration() {
+ let response = Ok(ChangeInvisibleDurationResponse {
+ status: Some(Status {
+ code: Code::Ok as i32,
+ message: "Success".to_string(),
+ }),
+ receipt_handle: "receipt_handle".to_string(),
+ });
+ let mut mock = session::MockRPCClient::new();
+ mock.expect_change_invisible_duration()
+ .return_once(|_| Box::pin(futures::future::ready(response)));
+
+ let client = new_client_for_test();
+ let change_invisible_duration_result = client
+ .change_invisible_duration_inner(
+ mock,
+ "test_topic".to_string(),
+ "receipt_handle".to_string(),
+ prost_types::Duration::default(),
+ "message_id".to_string(),
+ )
+ .await;
+ assert!(change_invisible_duration_result.is_ok());
+ assert_eq!(change_invisible_duration_result.unwrap(),
"receipt_handle");
+ }
+
#[tokio::test]
async fn client_ack_message_failed() {
let response = Ok(AckMessageResponse {
diff --git a/rust/src/model/common.rs b/rust/src/model/common.rs
index 919d4b36..5dee554c 100644
--- a/rust/src/model/common.rs
+++ b/rust/src/model/common.rs
@@ -27,12 +27,13 @@ use crate::error::{ClientError, ErrorKind};
use crate::pb;
use crate::pb::{Address, AddressScheme, MessageQueue};
-#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) enum ClientType {
Producer = 1,
+ #[allow(dead_code)]
PushConsumer = 2,
SimpleConsumer = 3,
+ #[allow(dead_code)]
PullConsumer = 4,
}
diff --git a/rust/src/model/message.rs b/rust/src/model/message.rs
index a2e9405a..28c232e8 100644
--- a/rust/src/model/message.rs
+++ b/rust/src/model/message.rs
@@ -59,7 +59,7 @@ impl Message for MessageImpl {
}
fn take_body(&mut self) -> Vec<u8> {
- self.body.take().unwrap_or(vec![])
+ self.body.take().unwrap_or_default()
}
fn take_tag(&mut self) -> Option<String> {
@@ -67,11 +67,11 @@ impl Message for MessageImpl {
}
fn take_keys(&mut self) -> Vec<String> {
- self.keys.take().unwrap_or(vec![])
+ self.keys.take().unwrap_or_default()
}
fn take_properties(&mut self) -> HashMap<String, String> {
- self.properties.take().unwrap_or(HashMap::new())
+ self.properties.take().unwrap_or_default()
}
fn take_message_group(&mut self) -> Option<String> {
diff --git a/rust/src/model/transaction.rs b/rust/src/model/transaction.rs
index 2f74679f..2a40d7bf 100644
--- a/rust/src/model/transaction.rs
+++ b/rust/src/model/transaction.rs
@@ -102,11 +102,11 @@ impl TransactionImpl {
#[async_trait]
impl Transaction for TransactionImpl {
async fn commit(mut self) -> Result<(), ClientError> {
- return self.end_transaction(TransactionResolution::COMMIT).await;
+ self.end_transaction(TransactionResolution::COMMIT).await
}
async fn rollback(mut self) -> Result<(), ClientError> {
- return self.end_transaction(TransactionResolution::ROLLBACK).await;
+ self.end_transaction(TransactionResolution::ROLLBACK).await
}
fn message_id(&self) -> &str {
diff --git a/rust/src/session.rs b/rust/src/session.rs
index 9441f7c5..d54894d8 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -33,7 +33,8 @@ use crate::error::ErrorKind;
use crate::model::common::Endpoints;
use crate::pb::telemetry_command::Command;
use crate::pb::{
- AckMessageRequest, AckMessageResponse, EndTransactionRequest,
EndTransactionResponse,
+ AckMessageRequest, AckMessageResponse, ChangeInvisibleDurationRequest,
+ ChangeInvisibleDurationResponse, EndTransactionRequest,
EndTransactionResponse,
HeartbeatRequest, HeartbeatResponse, NotifyClientTerminationRequest,
NotifyClientTerminationResponse, QueryRouteRequest, QueryRouteResponse,
ReceiveMessageRequest,
ReceiveMessageResponse, SendMessageRequest, SendMessageResponse,
TelemetryCommand,
@@ -49,6 +50,7 @@ const OPERATION_HEARTBEAT: &str = "rpc.heartbeat";
const OPERATION_SEND_MESSAGE: &str = "rpc.send_message";
const OPERATION_RECEIVE_MESSAGE: &str = "rpc.receive_message";
const OPERATION_ACK_MESSAGE: &str = "rpc.ack_message";
+const OPERATION_CHANGE_INVISIBLE_DURATION: &str =
"rpc.change_invisible_duration";
const OPERATION_END_TRANSACTION: &str = "rpc.end_transaction";
const OPERATION_NOTIFY_CLIENT_TERMINATION: &str =
"rpc.notify_client_termination";
@@ -75,6 +77,10 @@ pub(crate) trait RPCClient {
&mut self,
request: AckMessageRequest,
) -> Result<AckMessageResponse, ClientError>;
+ async fn change_invisible_duration(
+ &mut self,
+ request: ChangeInvisibleDurationRequest,
+ ) -> Result<ChangeInvisibleDurationResponse, ClientError>;
async fn end_transaction(
&mut self,
request: EndTransactionRequest,
@@ -85,7 +91,6 @@ pub(crate) trait RPCClient {
) -> Result<NotifyClientTerminationResponse, ClientError>;
}
-#[allow(dead_code)]
#[derive(Debug)]
pub(crate) struct Session {
logger: Logger,
@@ -353,7 +358,6 @@ impl Session {
}
}
- #[allow(dead_code)]
pub(crate) fn is_started(&self) -> bool {
self.shutdown_tx.is_some()
}
@@ -489,6 +493,26 @@ impl RPCClient for Session {
Ok(response.into_inner())
}
+ async fn change_invisible_duration(
+ &mut self,
+ request: ChangeInvisibleDurationRequest,
+ ) -> Result<ChangeInvisibleDurationResponse, ClientError> {
+ let request = self.sign(request);
+ let response = self
+ .stub
+ .change_invisible_duration(request)
+ .await
+ .map_err(|e| {
+ ClientError::new(
+ ErrorKind::ClientInternal,
+ "send rpc change_invisible_duration failed",
+ OPERATION_CHANGE_INVISIBLE_DURATION,
+ )
+ .set_source(e)
+ })?;
+ Ok(response.into_inner())
+ }
+
async fn end_transaction(
&mut self,
request: EndTransactionRequest,
@@ -571,7 +595,6 @@ impl SessionManager {
};
}
- #[allow(dead_code)]
pub(crate) async fn get_all_sessions(&self) -> Result<Vec<Session>,
ClientError> {
let session_map = self.session_map.lock().await;
let mut sessions = Vec::new();
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index a8777053..f8a6eace 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -175,6 +175,19 @@ impl SimpleConsumer {
self.client.ack_message(ack_entry).await?;
Ok(())
}
+
+ pub async fn change_invisible_duration(
+ &self,
+ ack_entry: &(impl AckMessageEntry + 'static),
+ invisible_duration: Duration,
+ ) -> Result<String, ClientError> {
+ self.client
+ .change_invisible_duration(
+ ack_entry,
+ prost_types::Duration::try_from(invisible_duration).unwrap(),
+ )
+ .await
+ }
}
#[cfg(test)]
diff --git a/rust/src/util.rs b/rust/src/util.rs
index e25d2622..6ea92cb5 100644
--- a/rust/src/util.rs
+++ b/rust/src/util.rs
@@ -92,7 +92,7 @@ pub(crate) fn build_producer_settings(
let topics = option
.topics()
.clone()
- .unwrap_or(vec![])
+ .unwrap_or_default()
.iter()
.map(|topic| Resource {
name: topic.to_string(),