GitHub user ppluck edited a discussion: Rocketmq Rust clients: I have an error
for "..." while consuming a message.
I need your help. thanks!
---
- rmq cluster: 5.0
- the log:
```text
INFO create session success, peer: ...
DEBG start session success, peer: ...
DEBG query route for topic=test-topic, component: ...
DEBG receive telemetry command: TelemetryCommand { status: Some(Status { code:
Ok, ...
DEBG send heartbeat to server success, peer=...
ERRO \client.rs:196:33] handle telemetry command failed: Failed to parse config
at client.handle_telemetry_command =>
receive telemetry command but there is no handler
Context:
command: Settings(Settings { client_type: Some(SimpleConsumer),
access_point: None, ...
DEBG query route for topic=test-topic success: route=Route { index: 0, queue:
[MessageQueue ...
INFO update route for topic=test-topic, component: client
INFO simple_consumer.rs:99:9] start simple consumer success, client_id: ...
DEBG client.rs:201:25] receive shutdown signal, stop heartbeat task and
telemetry command handler,...
INFO client.rs:206:13] heartbeat task and telemetry command handler are
stopped,...
```
- code:
```rust
let mut consumer_option = SimpleConsumerOption::default();
consumer_option.set_topics(vec!["test-topic"]);
consumer_option.set_consumer_group("CG-test");
// set which rocketmq proxy to connect
let mut client_option = ClientOption::default(); // producer type
client_option.set_access_url("localhost:8081");
client_option.set_access_key("CG-test"); // This is a company agreement
client_option.set_secret_key("CT-xxxx"); // This is a company agreement
// build and start simple consumer
let mut consumer = SimpleConsumer::new(consumer_option,
client_option).unwrap();
consumer.start().await.unwrap();
// pop message from rocketmq proxy
let receive_result = consumer
.receive(
"test-topic".to_string(),
&FilterExpression::new(FilterType::Tag, "*"),
)
.await;
let messages = receive_result.unwrap();
for message in messages {
println!("receive message: {:?}", message);
// ack message to rocketmq proxy
let ack_result = consumer.ack(&message).await;
}
```
---
supplement:
I found this error while I was debugging.
client.rs 285, here method only mathed RecoverOrphanedTransactionCommand,
actually has others:
```rust
Settings(super::Settings),
ThreadStackTrace(super::ThreadStackTrace),
VerifyMessageResult(super::VerifyMessageResult),
RecoverOrphanedTransactionCommand(super::RecoverOrphanedTransactionCommand),
PrintThreadStackTraceCommand(super::PrintThreadStackTraceCommand),
VerifyMessageCommand(super::VerifyMessageCommand),
```
```rust
async fn handle_telemetry_command<T: RPCClient + 'static>(
mut rpc_client: T,
transaction_checker: &Option<Box<TransactionChecker>>,
endpoints: Endpoints,
command: pb::telemetry_command::Command,
) -> Result<(), ClientError> {
return match command {
RecoverOrphanedTransactionCommand(command) => {
let transaction_id = command.transaction_id;
let message = command.message.unwrap();
let message_id = message
.system_properties
.as_ref()
.unwrap()
.message_id
.clone();
let topic = message.topic.as_ref().unwrap().clone();
if let Some(transaction_checker) = transaction_checker {
let resolution = transaction_checker(
transaction_id.clone(),
MessageView::from_pb_message(message, endpoints),
);
let response = rpc_client
.end_transaction(EndTransactionRequest {
topic: Some(topic),
message_id: message_id.to_string(),
transaction_id,
resolution: resolution as i32,
source: TransactionSource::SourceServerCheck as i32,
trace_context: "".to_string(),
})
.await?;
Self::handle_response_status(response.status,
OPERATION_END_TRANSACTION)
} else {
Err(ClientError::new(
ErrorKind::Config,
"failed to get transaction checker",
OPERATION_END_TRANSACTION,
))
}
}
_ => Err(ClientError::new(
ErrorKind::Config,
"receive telemetry command but there is no handler",
OPERATION_HANDLE_TELEMETRY_COMMAND,
)
.with_context("command", format!("{:?}", command))),
};
}
```
GitHub link: https://github.com/apache/rocketmq-clients/discussions/737
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]