This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 759aa42d [ISSUE #1040] [Rust] Migrate from slog to tracing logging
framework
759aa42d is described below
commit 759aa42d38052210b1465f6f997136d32c106788
Author: Adam Basfop Cavendish <[email protected]>
AuthorDate: Wed Jul 23 15:52:57 2025 +0800
[ISSUE #1040] [Rust] Migrate from slog to tracing logging framework
- Replace slog dependency with tracing for improved logging capabilities
- Remove custom log.rs module and conf.rs logging configuration
- Update all modules (client, producer, consumers, session) to use tracing
- Simplify logging setup and improve observability
---
rust/Cargo.toml | 12 +-
rust/examples/delay_producer.rs | 2 +
rust/examples/fifo_producer.rs | 2 +
rust/examples/producer.rs | 2 +
rust/examples/push_consumer.rs | 2 +
rust/examples/simple_consumer.rs | 2 +
rust/examples/transaction_producer.rs | 2 +
rust/src/client.rs | 80 ++++---------
rust/src/conf.rs | 43 -------
rust/src/lib.rs | 2 -
rust/src/log.rs | 55 ---------
rust/src/producer.rs | 28 ++---
rust/src/push_consumer.rs | 208 +++++++++-------------------------
rust/src/session.rs | 61 +++-------
rust/src/simple_consumer.rs | 26 ++---
15 files changed, 125 insertions(+), 402 deletions(-)
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 692ab3f5..e55a0bba 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -46,10 +46,7 @@ parking_lot = "0.12"
hostname = "0.3.1"
os_info = "3"
-slog = { version = "2.7.0", features = ["max_level_trace",
"release_max_level_info"] }
-slog-term = "2.9.0"
-slog-async = "2.7.0"
-slog-json = "2.6.1"
+tracing = "0.1.41"
opentelemetry = { version = "0.19.0", features = ["metrics", "rt-tokio"] }
opentelemetry-otlp = { version = "0.12.0", features = ["metrics",
"grpc-tonic"] }
@@ -58,7 +55,7 @@ minitrace = "0.4"
byteorder = "1"
mac_address = "1.1.4"
hex = "0.4.3"
-time = { version = "0.3", features = ["local-offset"] }
+time = { version = "0.3", features = ["formatting", "local-offset"] }
once_cell = "1.18.0"
mockall = "0.11.4"
@@ -75,9 +72,10 @@ version_check = "0.9.4"
regex = "1.7.3"
[dev-dependencies]
-wiremock-grpc = "0.0.3-alpha2"
-futures = "0.3"
awaitility = "0.3.0"
+futures = "0.3"
+tracing-subscriber = "0.3.19"
+wiremock-grpc = "0.0.3-alpha2"
[features]
default = ["example_ack"]
diff --git a/rust/examples/delay_producer.rs b/rust/examples/delay_producer.rs
index cca3baea..87c9b05b 100644
--- a/rust/examples/delay_producer.rs
+++ b/rust/examples/delay_producer.rs
@@ -23,6 +23,8 @@ use rocketmq::Producer;
#[tokio::main]
async fn main() {
+ tracing_subscriber::fmt::init();
+
// It's recommended to specify the topics that applications will publish
messages to
// because the producer will prefetch topic routes for them on start and
fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
diff --git a/rust/examples/fifo_producer.rs b/rust/examples/fifo_producer.rs
index 211ae683..5f0b5078 100644
--- a/rust/examples/fifo_producer.rs
+++ b/rust/examples/fifo_producer.rs
@@ -20,6 +20,8 @@ use rocketmq::Producer;
#[tokio::main]
async fn main() {
+ tracing_subscriber::fmt::init();
+
// It's recommended to specify the topics that applications will publish
messages to
// because the producer will prefetch topic routes for them on start and
fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index 7179c9d2..e92c10be 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -20,6 +20,8 @@ use rocketmq::Producer;
#[tokio::main]
async fn main() {
+ tracing_subscriber::fmt::init();
+
// It's recommended to specify the topics that applications will publish
messages to
// because the producer will prefetch topic routes for them on start and
fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
diff --git a/rust/examples/push_consumer.rs b/rust/examples/push_consumer.rs
index 69a6a3ac..3de2691c 100644
--- a/rust/examples/push_consumer.rs
+++ b/rust/examples/push_consumer.rs
@@ -24,6 +24,8 @@ use tokio::time;
#[tokio::main]
async fn main() {
+ tracing_subscriber::fmt::init();
+
let mut client_option = ClientOption::default();
client_option.set_access_url("localhost:8081");
client_option.set_enable_tls(false);
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index d9bb06da..8079432f 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -23,6 +23,8 @@ use rocketmq::SimpleConsumer;
#[tokio::main]
async fn main() {
+ tracing_subscriber::fmt::init();
+
// It's recommended to specify the topics that applications will publish
messages to
// because the simple consumer will prefetch topic routes for them on
start and fail fast in case they do not exist
let mut consumer_option = SimpleConsumerOption::default();
diff --git a/rust/examples/transaction_producer.rs
b/rust/examples/transaction_producer.rs
index 4bc621fa..4d8c88d3 100644
--- a/rust/examples/transaction_producer.rs
+++ b/rust/examples/transaction_producer.rs
@@ -27,6 +27,8 @@ static MESSAGE_ID_SET: Lazy<Mutex<HashSet<String>>> =
Lazy::new(|| Mutex::new(Ha
#[tokio::main]
async fn main() {
+ tracing_subscriber::fmt::init();
+
// It's recommended to specify the topics that applications will publish
messages to
// because the producer will prefetch topic routes for them on start and
fail fast in case they do not exist
let mut producer_option = ProducerOption::default();
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 3b286fc4..39686821 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -24,10 +24,10 @@ use mockall_double::double;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use prost_types::Duration;
-use slog::{debug, error, info, o, warn, Logger};
use tokio::select;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
+use tracing::{debug, error, info, warn};
use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
@@ -48,7 +48,6 @@ use crate::util::{handle_response_status,
select_message_queue};
#[derive(Debug)]
pub(crate) struct Client {
- logger: Logger,
option: ClientOption,
session_manager: Arc<SessionManager>,
route_manager: TopicRouteManager,
@@ -62,7 +61,6 @@ pub(crate) struct Client {
#[derive(Debug, Clone)]
pub(crate) struct TopicRouteManager {
route_table: Arc<Mutex<HashMap<String /* topic */, RouteStatus>>>,
- logger: Logger,
access_endpoints: Endpoints,
namespace: String,
}
@@ -82,21 +80,16 @@ const OPERATION_ACK_MESSAGE: &str = "client.ack_message";
#[automock]
impl Client {
pub(crate) fn new(
- logger: &Logger,
option: ClientOption,
settings: TelemetryCommand,
) -> Result<Self, ClientError> {
let id = Self::generate_client_id();
let endpoints = Endpoints::from_url(option.access_url())
.map_err(|e| e.with_operation(OPERATION_CLIENT_NEW))?;
- let session_manager = SessionManager::new(logger, id.clone(), &option);
- let route_manager = TopicRouteManager::new(
- logger.clone(),
- option.get_namespace().to_string(),
- endpoints.clone(),
- );
+ let session_manager = SessionManager::new(id.clone(), &option);
+ let route_manager =
+ TopicRouteManager::new(option.get_namespace().to_string(),
endpoints.clone());
Ok(Client {
- logger: logger.new(o!("component" => "client")),
option,
session_manager: Arc::new(session_manager),
route_manager,
@@ -124,7 +117,6 @@ impl Client {
&mut self,
telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command>,
) -> Result<(), ClientError> {
- let logger = self.logger.clone();
let session_manager = self.session_manager.clone();
let group = self.option.group.clone();
@@ -157,7 +149,6 @@ impl Client {
let sessions =
session_manager.get_all_sessions().await;
if sessions.is_err() {
error!(
- logger,
"send heartbeat failed: failed to get
sessions: {}",
sessions.unwrap_err()
);
@@ -169,7 +160,6 @@ impl Client {
let response = Self::heart_beat_inner(session,
&group, &namespace, &client_type).await;
if response.is_err() {
error!(
- logger,
"send heartbeat failed: failed to send
heartbeat rpc: {}",
response.unwrap_err()
);
@@ -179,45 +169,44 @@ impl Client {
handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
if result.is_err() {
error!(
- logger,
"send heartbeat failed: server return
error: {}",
result.unwrap_err()
);
continue;
}
- debug!(logger,"send heartbeat to server success,
peer={}",peer);
+ debug!("send heartbeat to server success,
peer={}",peer);
}
},
_ = sync_settings_interval.tick() => {
let sessions =
session_manager.get_all_sessions().await;
if sessions.is_err() {
- error!(logger, "sync settings failed: failed to
get sessions: {}", sessions.unwrap_err());
+ error!("sync settings failed: failed to get
sessions: {}", sessions.unwrap_err());
continue;
}
for mut session in sessions.unwrap() {
let peer = session.peer().to_string();
let result =
session.update_settings(settings.clone()).await;
if result.is_err() {
- error!(logger, "sync settings failed: failed
to call rpc: {}", result.unwrap_err());
+ error!("sync settings failed: failed to call
rpc: {}", result.unwrap_err());
continue;
}
- debug!(logger, "sync settings success, peer = {}",
peer);
+ debug!("sync settings success, peer = {}", peer);
}
},
_ = sync_route_timer.tick() => {
let result = route_manager.sync_route_data(&mut
rpc_client).await;
if result.is_err() {
- error!(logger, "sync route failed: {}",
result.unwrap_err());
+ error!("sync route failed: {}",
result.unwrap_err());
}
},
_ = &mut shutdown_rx => {
- info!(logger, "receive shutdown signal, stop heartbeat
and telemetry tasks.");
+ info!("receive shutdown signal, stop heartbeat and
telemetry tasks.");
break;
}
}
}
- info!(logger, "heartbeat and telemetry task were stopped");
+ info!("heartbeat and telemetry task were stopped");
});
Ok(())
}
@@ -516,9 +505,8 @@ impl Client {
}
impl TopicRouteManager {
- pub(crate) fn new(logger: Logger, namespace: String, access_endpoints:
Endpoints) -> Self {
+ pub(crate) fn new(namespace: String, access_endpoints: Endpoints) -> Self {
Self {
- logger,
namespace,
access_endpoints,
route_table: Arc::new(Mutex::new(HashMap::new())),
@@ -533,7 +521,7 @@ impl TopicRouteManager {
{
topics = self.route_table.lock().keys().cloned().collect();
}
- debug!(self.logger, "sync topic route of topics {:?}", topics);
+ debug!("sync topic route of topics {:?}", topics);
for topic in topics {
self.topic_route_inner(rpc_client, &topic).await?;
}
@@ -545,7 +533,7 @@ impl TopicRouteManager {
rpc_client: &mut T,
topics: Vec<String>,
) -> Result<(), ClientError> {
- debug!(self.logger, "sync topic route of topics {:?}.", topics);
+ debug!("sync topic route of topics {:?}.", topics);
for topic in topics {
self.topic_route_inner(rpc_client, topic.as_str()).await?;
}
@@ -557,7 +545,7 @@ impl TopicRouteManager {
rpc_client: &mut T,
topic: &str,
) -> Result<Arc<Route>, ClientError> {
- debug!(self.logger, "query route for topic={}", topic);
+ debug!("query route for topic={}", topic);
let rx = match self
.route_table
.lock()
@@ -604,10 +592,7 @@ impl TopicRouteManager {
// send result to all waiters
if let Ok(route) = result {
- debug!(
- self.logger,
- "query route for topic={} success: route={:?}", topic, route
- );
+ debug!("query route for topic={} success: route={:?}", topic,
route);
let route = Arc::new(route);
let mut route_table_lock = self.route_table.lock();
@@ -620,7 +605,7 @@ impl TopicRouteManager {
let prev =
route_table_lock.insert(topic.to_owned(),
RouteStatus::Found(Arc::clone(&route)));
- info!(self.logger, "update route for topic={}", topic);
+ info!("update route for topic={}", topic);
if let Some(RouteStatus::Querying(Some(mut v))) = prev {
for item in v.drain(..) {
@@ -630,10 +615,7 @@ impl TopicRouteManager {
Ok(route)
} else {
let err = result.unwrap_err();
- warn!(
- self.logger,
- "query route for topic={} failed: error={}", topic, err
- );
+ warn!("query route for topic={} failed: error={}", topic, err);
let mut route_table_lock = self.route_table.lock();
// keep the existing route if error occurs.
if let Some(RouteStatus::Found(prev)) =
route_table_lock.get(topic) {
@@ -704,7 +686,6 @@ impl TopicRouteManager {
#[derive(Debug)]
pub(crate) struct SessionManager {
- logger: Logger,
client_id: String,
option: ClientOption,
session_map: tokio::sync::Mutex<HashMap<String, Session>>,
@@ -712,11 +693,9 @@ pub(crate) struct SessionManager {
#[automock]
impl SessionManager {
- pub(crate) fn new(logger: &Logger, client_id: String, option:
&ClientOption) -> Self {
- let logger = logger.new(o!("component" => "session"));
+ pub(crate) fn new(client_id: String, option: &ClientOption) -> Self {
let session_map = tokio::sync::Mutex::new(HashMap::new());
SessionManager {
- logger,
client_id,
option: option.clone(),
session_map,
@@ -734,13 +713,7 @@ impl SessionManager {
if session_map.contains_key(&endpoint_url) {
Ok(session_map.get(&endpoint_url).unwrap().shadow_session())
} else {
- let mut session = Session::new(
- &self.logger,
- endpoints,
- self.client_id.clone(),
- &self.option,
- )
- .await?;
+ let mut session = Session::new(endpoints, self.client_id.clone(),
&self.option).await?;
session.start(settings, telemetry_command_tx).await?;
let shadow_session = session.shadow_session();
session_map.insert(endpoint_url.clone(), session);
@@ -777,7 +750,6 @@ pub(crate) mod tests {
use crate::client::Client;
use crate::conf::ClientOption;
use crate::error::{ClientError, ErrorKind};
- use crate::log::terminal_logger;
use crate::model::common::{ClientType, Route};
use crate::pb::{
AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse,
Code,
@@ -794,7 +766,6 @@ pub(crate) mod tests {
fn new_route_manager_for_test() -> TopicRouteManager {
TopicRouteManager {
- logger: terminal_logger(),
route_table: Arc::new(Mutex::new(HashMap::new())),
access_endpoints:
Endpoints::from_url("http://localhost:8081").unwrap(),
namespace: "".to_string(),
@@ -803,7 +774,6 @@ pub(crate) mod tests {
fn new_session_manager() -> SessionManager {
SessionManager::new(
- &terminal_logger(),
Client::generate_client_id(),
&ClientOption {
group: Some("group".to_string()),
@@ -814,7 +784,6 @@ pub(crate) mod tests {
fn new_client_for_test() -> Client {
Client {
- logger: terminal_logger(),
option: ClientOption {
group: Some("group".to_string()),
..Default::default()
@@ -832,7 +801,6 @@ pub(crate) mod tests {
fn new_client_with_session_manager(session_manager: SessionManager) ->
Client {
let (tx, _) = mpsc::channel(16);
Client {
- logger: terminal_logger(),
option: ClientOption::default(),
session_manager: Arc::new(session_manager),
route_manager: new_route_manager_for_test(),
@@ -853,11 +821,7 @@ pub(crate) mod tests {
#[test]
fn client_new() -> Result<(), ClientError> {
- Client::new(
- &terminal_logger(),
- ClientOption::default(),
- TelemetryCommand::default(),
- )?;
+ Client::new(ClientOption::default(), TelemetryCommand::default())?;
Ok(())
}
@@ -870,7 +834,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn client_get_session() {
let context = MockSession::new_context();
- context.expect().returning(|_, _, _, _| {
+ context.expect().returning(|_, _, _| {
let mut session = MockSession::default();
session.expect_start().returning(|_, _| Ok(()));
session
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 79d92f60..ec61d1ca 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -123,19 +123,9 @@ impl ClientOption {
}
}
-/// Log format for output.
-#[derive(Debug, Clone, Eq, PartialEq)]
-pub enum LoggingFormat {
- /// Print log in terminal
- Terminal,
- /// Print log in json file
- Json,
-}
-
/// The configuration of [`Producer`].
#[derive(Debug, Clone)]
pub struct ProducerOption {
- logging_format: LoggingFormat,
prefetch_route: bool,
topics: Option<Vec<String>>,
namespace: String,
@@ -146,7 +136,6 @@ pub struct ProducerOption {
impl Default for ProducerOption {
fn default() -> Self {
ProducerOption {
- logging_format: LoggingFormat::Terminal,
prefetch_route: true,
topics: None,
namespace: "".to_string(),
@@ -157,15 +146,6 @@ impl Default for ProducerOption {
}
impl ProducerOption {
- /// Get the logging format of producer
- pub fn logging_format(&self) -> &LoggingFormat {
- &self.logging_format
- }
- /// Set the logging format for producer
- pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
- self.logging_format = logging_format;
- }
-
/// Whether to prefetch route info
pub fn prefetch_route(&self) -> &bool {
&self.prefetch_route
@@ -209,7 +189,6 @@ impl ProducerOption {
/// The configuration of [`SimpleConsumer`].
#[derive(Debug, Clone)]
pub struct SimpleConsumerOption {
- logging_format: LoggingFormat,
consumer_group: String,
prefetch_route: bool,
topics: Option<Vec<String>>,
@@ -221,7 +200,6 @@ pub struct SimpleConsumerOption {
impl Default for SimpleConsumerOption {
fn default() -> Self {
SimpleConsumerOption {
- logging_format: LoggingFormat::Terminal,
consumer_group: "".to_string(),
prefetch_route: true,
topics: None,
@@ -233,15 +211,6 @@ impl Default for SimpleConsumerOption {
}
impl SimpleConsumerOption {
- /// Set the logging format of simple consumer
- pub fn logging_format(&self) -> &LoggingFormat {
- &self.logging_format
- }
- /// set the logging format for simple consumer
- pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
- self.logging_format = logging_format;
- }
-
/// Get the consumer group of simple consumer
pub fn consumer_group(&self) -> &str {
&self.consumer_group
@@ -288,7 +257,6 @@ impl SimpleConsumerOption {
#[derive(Debug, Clone)]
pub struct PushConsumerOption {
- logging_format: LoggingFormat,
consumer_group: String,
namespace: String,
timeout: Duration,
@@ -302,7 +270,6 @@ pub struct PushConsumerOption {
impl Default for PushConsumerOption {
fn default() -> Self {
Self {
- logging_format: LoggingFormat::Terminal,
consumer_group: "".to_string(),
namespace: "".to_string(),
timeout: Duration::from_secs(3),
@@ -364,14 +331,6 @@ impl PushConsumerOption {
self.fifo = fifo;
}
- pub fn logging_format(&self) -> &LoggingFormat {
- &self.logging_format
- }
-
- pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
- self.logging_format = logging_format;
- }
-
pub fn batch_size(&self) -> i32 {
self.batch_size
}
@@ -537,7 +496,6 @@ mod tests {
#[test]
fn conf_producer_option() {
let option = ProducerOption::default();
- assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
assert!(option.prefetch_route());
assert!(option.validate_message_type());
}
@@ -545,7 +503,6 @@ mod tests {
#[test]
fn conf_simple_consumer_option() {
let option = SimpleConsumerOption::default();
- assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
assert!(option.prefetch_route());
}
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index dc7478a0..c12eba67 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -128,8 +128,6 @@ pub use simple_consumer::SimpleConsumer;
#[allow(dead_code)]
pub mod conf;
pub mod error;
-#[allow(dead_code)]
-mod log;
mod client;
diff --git a/rust/src/log.rs b/rust/src/log.rs
deleted file mode 100644
index db2f4736..00000000
--- a/rust/src/log.rs
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use std::fs::OpenOptions;
-
-use slog::{o, Drain, Logger};
-use slog_async::OverflowStrategy;
-
-use crate::conf::LoggingFormat;
-
-pub(crate) fn terminal_logger() -> Logger {
- let decorator = slog_term::TermDecorator::new().build();
- let drain = slog_term::FullFormat::new(decorator)
- .use_file_location()
- .build()
- .fuse();
- let drain = slog_async::Async::new(drain)
- .overflow_strategy(OverflowStrategy::Block)
- .chan_size(1)
- .build()
- .fuse();
- Logger::root(drain, o!())
-}
-
-fn json_logger(filepath: &str) -> Logger {
- let file = OpenOptions::new()
- .create(true)
- .write(true)
- .truncate(true)
- .open(filepath)
- .unwrap();
- let decorator = slog_json::Json::default(file).fuse();
- let drain = slog_async::Async::new(decorator).build().fuse();
- Logger::root(drain, o!())
-}
-
-pub(crate) fn logger(logging_format: &LoggingFormat) -> Logger {
- match logging_format {
- LoggingFormat::Terminal => terminal_logger(),
- LoggingFormat::Json => json_logger("logs/rocketmq_client.log"),
- }
-}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 94cd295d..8bce38aa 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -22,9 +22,9 @@ use std::time::{SystemTime, UNIX_EPOCH};
use mockall_double::double;
use parking_lot::RwLock;
use prost_types::Timestamp;
-use slog::{error, info, warn, Logger};
use tokio::select;
use tokio::sync::{mpsc, oneshot};
+use tracing::{error, info, warn};
#[double]
use crate::client::Client;
@@ -35,6 +35,7 @@ use crate::model::message::{self, MessageTypeAware,
MessageView};
use crate::model::transaction::{
Transaction, TransactionChecker, TransactionImpl, TransactionResolution,
};
+use crate::pb;
use crate::pb::settings::PubSub;
use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand,
Settings};
use crate::pb::{Encoding, EndTransactionRequest, Resource, SystemProperties,
TransactionSource};
@@ -43,7 +44,6 @@ use crate::util::{
build_endpoints_by_message_queue, build_producer_settings,
handle_response_status,
select_message_queue, select_message_queue_by_message_group, HOST_NAME,
};
-use crate::{log, pb};
/// [`Producer`] is the core struct, to which application developers should
turn, when publishing messages to RocketMQ proxy.
///
@@ -53,7 +53,6 @@ use crate::{log, pb};
/// [`Producer`] is `Send` and `Sync` by design, so that developers may get
started easily.
pub struct Producer {
option: Arc<RwLock<ProducerOption>>,
- logger: Logger,
client: Client,
transaction_checker: Option<Box<TransactionChecker>>,
shutdown_tx: Option<oneshot::Sender<()>>,
@@ -84,11 +83,9 @@ impl Producer {
namespace: option.namespace().to_string(),
..client_option
};
- let logger = log::logger(option.logging_format());
- let client = Client::new(&logger, client_option,
build_producer_settings(&option))?;
+ let client = Client::new(client_option,
build_producer_settings(&option))?;
Ok(Producer {
option: Arc::new(RwLock::new(option)),
- logger,
client,
transaction_checker: None,
shutdown_tx: None,
@@ -112,11 +109,9 @@ impl Producer {
namespace: option.namespace().to_string(),
..client_option
};
- let logger = log::logger(option.logging_format());
- let client = Client::new(&logger, client_option,
build_producer_settings(&option))?;
+ let client = Client::new(client_option,
build_producer_settings(&option))?;
Ok(Producer {
option: Arc::new(RwLock::new(option)),
- logger,
client,
transaction_checker: Some(transaction_checker),
shutdown_tx: None,
@@ -156,7 +151,6 @@ impl Producer {
self.shutdown_tx = Some(shutdown_tx);
let rpc_client = self.client.get_session().await?;
let endpoints = self.client.get_endpoints();
- let logger = self.logger.clone();
let producer_option = Arc::clone(&self.option);
tokio::spawn(async move {
loop {
@@ -171,16 +165,16 @@ impl Producer {
&transaction_checker,
endpoints.clone()).await;
if let Err(error) = result {
- error!(logger, "handle trannsaction
command failed: {:?}", error);
+ error!("handle trannsaction command
failed: {:?}", error);
};
}
Settings(command) => {
let option = &mut producer_option.write();
Self::handle_settings_command(command,
option);
- info!(logger, "handle setting command
success.");
+ info!("handle setting command success.");
}
_ => {
- warn!(logger, "unimplemented command
{:?}", command);
+ warn!("unimplemented command {:?}",
command);
}
}
}
@@ -192,7 +186,6 @@ impl Producer {
}
});
info!(
- self.logger,
"start producer success, client_id: {}",
self.client.client_id()
);
@@ -473,7 +466,6 @@ mod tests {
use std::sync::Arc;
use crate::error::ErrorKind;
- use crate::log::terminal_logger;
use crate::model::common::Route;
use crate::model::message::{MessageBuilder, MessageImpl, MessageType};
use crate::model::transaction::TransactionResolution;
@@ -487,7 +479,6 @@ mod tests {
fn new_producer_for_test() -> Producer {
Producer {
option: Default::default(),
- logger: terminal_logger(),
client: Client::default(),
shutdown_tx: None,
transaction_checker: None,
@@ -497,7 +488,6 @@ mod tests {
fn new_transaction_producer_for_test() -> Producer {
Producer {
option: Default::default(),
- logger: terminal_logger(),
client: Client::default(),
shutdown_tx: None,
transaction_checker: Some(Box::new(|_, _|
TransactionResolution::COMMIT)),
@@ -509,7 +499,7 @@ mod tests {
let _m = crate::client::tests::MTX.lock();
let ctx = Client::new_context();
- ctx.expect().return_once(|_, _, _| {
+ ctx.expect().return_once(|_, _| {
let mut client = Client::default();
client.expect_topic_route().returning(|_, _| {
Ok(Arc::new(Route {
@@ -542,7 +532,7 @@ mod tests {
let _m = crate::client::tests::MTX.lock();
let ctx = Client::new_context();
- ctx.expect().return_once(|_, _, _| {
+ ctx.expect().return_once(|_, _| {
let mut client = Client::default();
client.expect_topic_route().returning(|_, _| {
Ok(Arc::new(Route {
diff --git a/rust/src/push_consumer.rs b/rust/src/push_consumer.rs
index 3ccac5cd..d3f9a23a 100644
--- a/rust/src/push_consumer.rs
+++ b/rust/src/push_consumer.rs
@@ -18,8 +18,6 @@
use mockall::automock;
use mockall_double::double;
use parking_lot::{Mutex, RwLock};
-use slog::Logger;
-use slog::{debug, error, info, warn};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
@@ -34,6 +32,7 @@ use crate::conf::{BackOffRetryPolicy, ClientOption,
PushConsumerOption};
use crate::error::{ClientError, ErrorKind};
use crate::model::common::{ClientType, ConsumeResult, MessageQueue};
use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb;
use crate::pb::receive_message_response::Content;
use crate::pb::{
AckMessageRequest, Assignment, ChangeInvisibleDurationRequest,
@@ -46,7 +45,7 @@ use crate::session::Session;
use crate::util::{
build_endpoints_by_message_queue, build_push_consumer_settings,
handle_response_status,
};
-use crate::{log, pb};
+use tracing::{debug, error, info, warn};
const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
@@ -58,7 +57,6 @@ const OPERATION_FORWARD_TO_DEADLETTER_QUEUE: &str =
"push_consumer.forward_to_de
pub type MessageListener = Box<dyn Fn(&MessageView) -> ConsumeResult + Send +
Sync>;
pub struct PushConsumer {
- logger: Logger,
client: Client,
message_listener: Arc<MessageListener>,
option: Arc<RwLock<PushConsumerOption>>,
@@ -91,14 +89,8 @@ impl PushConsumer {
group: Some(option.consumer_group().to_string()),
..client_option
};
- let logger = log::logger(option.logging_format());
- let client = Client::new(
- &logger,
- client_option,
- build_push_consumer_settings(&option),
- )?;
+ let client = Client::new(client_option,
build_push_consumer_settings(&option))?;
Ok(Self {
- logger,
client,
message_listener: Arc::new(message_listener),
option: Arc::new(RwLock::new(option)),
@@ -125,7 +117,6 @@ impl PushConsumer {
route_manager
.sync_topic_routes(&mut rpc_client, topics)
.await?;
- let logger = self.logger.clone();
let mut actor_table: HashMap<MessageQueue, MessageQueueActor> =
HashMap::new();
let message_listener = Arc::clone(&self.message_listener);
@@ -155,7 +146,7 @@ impl PushConsumer {
option_retry_policy = retry_policy.lock().clone();
}
if option_retry_policy.is_none() {
- warn!(logger, "retry policy is not set. skip
scanning.");
+ warn!("retry policy is not set. skip scanning.");
continue;
}
let retry_policy_inner = option_retry_policy.unwrap();
@@ -181,7 +172,7 @@ impl PushConsumer {
let result =
rpc_client.query_assignment(request).await;
if let Ok(response) = result {
if handle_response_status(response.status,
OPERATION_START_PUSH_CONSUMER).is_ok() {
- let result =
Self::process_assignments(logger.clone(),
+ let result =
Self::process_assignments(
&rpc_client,
&consumer_option,
Arc::clone(&message_listener),
@@ -190,20 +181,20 @@ impl PushConsumer {
retry_policy_inner.clone(),
).await;
if result.is_err() {
- error!(logger, "process
assignments failed: {:?}", result.unwrap_err());
+ error!("process assignments
failed: {:?}", result.unwrap_err());
}
} else {
- error!(logger, "query assignment
failed, no status in response.");
+ error!("query assignment failed, no
status in response.");
}
} else {
- error!(logger, "query assignment failed:
{:?}", result.unwrap_err());
+ error!("query assignment failed: {:?}",
result.unwrap_err());
}
}
}
}
_ = shutdown_token.cancelled() => {
let entries = actor_table.drain();
- info!(logger, "shutdown {:?} actors", entries.len());
+ info!("shutdown {:?} actors", entries.len());
for (_, actor) in entries {
let _ = actor.shutdown().await;
}
@@ -216,7 +207,6 @@ impl PushConsumer {
}
async fn process_assignments(
- logger: Logger,
rpc_client: &Session,
option: &PushConsumerOption,
message_listener: Arc<MessageListener>,
@@ -261,7 +251,6 @@ impl PushConsumer {
}
let option = option.clone();
let mut actor = MessageQueueActor::new(
- logger.clone(),
rpc_client.shadow_session(),
message_queue.clone(),
option,
@@ -290,7 +279,6 @@ impl PushConsumer {
async fn receive_messages<T: RPCClient + 'static>(
rpc_client: &mut T,
- logger: &Logger,
message_queue: &MessageQueue,
option: &PushConsumerOption,
) -> Result<Vec<MessageView>, ClientError> {
@@ -326,10 +314,10 @@ impl PushConsumer {
let content = response.content.unwrap();
match content {
Content::Status(status) => {
- warn!(logger, "unhandled status message {:?}", status);
+ warn!("unhandled status message {:?}", status);
}
Content::DeliveryTimestamp(_) => {
- warn!(logger, "unhandled delivery timestamp message");
+ warn!("unhandled delivery timestamp message");
}
Content::Message(message) => {
messages.push(
@@ -424,7 +412,6 @@ impl DlqEntry {
}
struct MessageQueueActor {
- logger: Logger,
rpc_client: Session,
message_queue: MessageQueue,
option: PushConsumerOption,
@@ -437,7 +424,6 @@ struct MessageQueueActor {
#[automock]
impl MessageQueueActor {
pub(crate) fn new(
- logger: Logger,
rpc_client: Session,
message_queue: MessageQueue,
option: PushConsumerOption,
@@ -445,7 +431,6 @@ impl MessageQueueActor {
retry_policy: BackOffRetryPolicy,
) -> Self {
Self {
- logger,
rpc_client,
message_queue,
option,
@@ -466,10 +451,7 @@ impl MessageQueueActor {
}
pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
- debug!(
- self.logger,
- "start a new queue actor {:?}", self.message_queue
- );
+ debug!("start a new queue actor {:?}", self.message_queue);
let shutdown_token = CancellationToken::new();
self.shutdown_token = Some(shutdown_token.clone());
let task_tracker = TaskTracker::new();
@@ -479,23 +461,18 @@ impl MessageQueueActor {
if self.option.fifo() {
consumer_worker_count = 1;
}
- info!(
- self.logger,
- "start consumer worker count: {}", consumer_worker_count
- );
+ info!("start consumer worker count: {}", consumer_worker_count);
for _ in 0..consumer_worker_count {
// create consumer worker
let mut consumer_worker: ConsumerWorker;
if self.option.fifo() {
consumer_worker = ConsumerWorker::Fifo(FifoConsumerWorker::new(
- self.logger.clone(),
self.rpc_client.shadow_session(),
Arc::clone(&self.message_listener),
));
} else {
consumer_worker =
ConsumerWorker::Standard(StandardConsumerWorker::new(
- self.logger.clone(),
self.rpc_client.shadow_session(),
Arc::clone(&self.message_listener),
));
@@ -505,21 +482,19 @@ impl MessageQueueActor {
let option = self.option.clone();
let retry_policy = self.retry_policy.clone();
let mut ack_processor = AckEntryProcessor::new(
- self.logger.clone(),
self.rpc_client.shadow_session(),
self.option.get_consumer_group_resource(),
self.message_queue.topic.to_owned(),
);
ack_processor.start().await?;
let shutdown_token = shutdown_token.clone();
- let logger = self.logger.clone();
task_tracker.spawn(async move {
loop {
select! {
_ = ticker.tick() => {
let result =
consumer_worker.receive_messages(&message_queue, &option, &mut ack_processor,
&retry_policy).await;
if result.is_err() {
- error!(logger, "receive messages error: {:?}",
result.err());
+ error!("receive messages error: {:?}",
result.err());
}
}
_ = shutdown_token.cancelled() => {
@@ -534,7 +509,7 @@ impl MessageQueueActor {
}
pub(crate) async fn shutdown(mut self) -> Result<(), ClientError> {
- debug!(self.logger, "shutdown queue actor {:?}", self.message_queue);
+ debug!("shutdown queue actor {:?}", self.message_queue);
if let Some(shutdown_token) = self.shutdown_token.take() {
shutdown_token.cancel();
}
@@ -575,15 +550,13 @@ impl ConsumerWorker {
}
struct StandardConsumerWorker {
- logger: Logger,
rpc_client: Session,
message_listener: Arc<MessageListener>,
}
impl StandardConsumerWorker {
- fn new(logger: Logger, rpc_client: Session, message_listener:
Arc<MessageListener>) -> Self {
+ fn new(rpc_client: Session, message_listener: Arc<MessageListener>) ->
Self {
Self {
- logger,
rpc_client,
message_listener,
}
@@ -596,13 +569,8 @@ impl StandardConsumerWorker {
ack_processor: &mut AckEntryProcessor,
retry_policy: &BackOffRetryPolicy,
) -> Result<(), ClientError> {
- let messages = PushConsumer::receive_messages(
- &mut self.rpc_client,
- &self.logger,
- message_queue,
- option,
- )
- .await?;
+ let messages =
+ PushConsumer::receive_messages(&mut self.rpc_client,
message_queue, option).await?;
for message in messages {
let consume_result = (self.message_listener)(&message);
match consume_result {
@@ -623,15 +591,13 @@ impl StandardConsumerWorker {
}
struct FifoConsumerWorker {
- logger: Logger,
rpc_client: Session,
message_listener: Arc<MessageListener>,
}
impl FifoConsumerWorker {
- fn new(logger: Logger, rpc_client: Session, message_listener:
Arc<MessageListener>) -> Self {
+ fn new(rpc_client: Session, message_listener: Arc<MessageListener>) ->
Self {
Self {
- logger,
rpc_client,
message_listener,
}
@@ -644,13 +610,8 @@ impl FifoConsumerWorker {
ack_processor: &mut AckEntryProcessor,
retry_policy: &BackOffRetryPolicy,
) -> Result<(), ClientError> {
- let messages = PushConsumer::receive_messages(
- &mut self.rpc_client,
- &self.logger,
- message_queue,
- option,
- )
- .await?;
+ let messages =
+ PushConsumer::receive_messages(&mut self.rpc_client,
message_queue, option).await?;
for message in messages {
let mut delivery_attempt = message.delivery_attempt();
let max_delivery_attempts = retry_policy.get_max_attempts();
@@ -688,7 +649,6 @@ impl FifoConsumerWorker {
}
struct AckEntryProcessor {
- logger: Logger,
rpc_client: Session,
consumer_group: Resource,
topic: Resource,
@@ -698,9 +658,8 @@ struct AckEntryProcessor {
}
impl AckEntryProcessor {
- fn new(logger: Logger, rpc_client: Session, consumer_group: Resource,
topic: Resource) -> Self {
+ fn new(rpc_client: Session, consumer_group: Resource, topic: Resource) ->
Self {
Self {
- logger,
rpc_client,
consumer_group,
topic,
@@ -712,7 +671,6 @@ impl AckEntryProcessor {
fn shadow_self(&self) -> Self {
Self {
- logger: self.logger.clone(),
rpc_client: self.rpc_client.shadow_session(),
consumer_group: self.consumer_group.clone(),
topic: self.topic.clone(),
@@ -730,17 +688,10 @@ impl AckEntryProcessor {
.send(AckEntryItem::Ack(AckEntry::new(message)))
.await;
if send_result.is_err() {
- error!(
- self.logger,
- "put ack entry to queue error {:?}",
- send_result.err()
- );
+ error!("put ack entry to queue error {:?}",
send_result.err());
}
} else {
- error!(
- self.logger,
- "The ack entry sender is not set. Drop the ack message."
- );
+ error!("The ack entry sender is not set. Drop the ack
message.");
}
}
}
@@ -762,17 +713,10 @@ impl AckEntryProcessor {
)))
.await;
if send_result.is_err() {
- error!(
- self.logger,
- "put nack entry to queue error {:?}",
- send_result.err()
- );
+ error!("put nack entry to queue error {:?}",
send_result.err());
}
} else {
- error!(
- self.logger,
- "Drop the nack message due to ack entry sender is not set."
- )
+ error!("Drop the nack message due to ack entry sender is not
set.")
}
}
}
@@ -796,17 +740,10 @@ impl AckEntryProcessor {
)))
.await;
if send_result.is_err() {
- error!(
- self.logger,
- "put dlq entry to queue error {:?}",
- send_result.err()
- );
+ error!("put dlq entry to queue error {:?}",
send_result.err());
}
} else {
- error!(
- self.logger,
- "Drop the dlq message due to ack edntry sender is not set."
- );
+ error!("Drop the dlq message due to ack edntry sender is not
set.");
}
}
}
@@ -869,15 +806,15 @@ impl AckEntryProcessor {
_ = ack_ticker.tick() => {
let result = processor.process_ack_entry_queue(&mut
ack_entry_queue).await;
if result.is_err() {
- error!(processor.logger, "process ack entry queue
failed: {:?}", result);
+ error!("process ack entry queue failed: {:?}",
result);
}
}
Some(ack_entry) = ack_entry_receiver.recv() => {
ack_entry_queue.push_back(ack_entry);
- debug!(processor.logger, "ack entry queue size: {}",
ack_entry_queue.len());
+ debug!("ack entry queue size: {}",
ack_entry_queue.len());
}
_ = shutdown_token.cancelled() => {
- info!(processor.logger, "need to process remaining {}
entries on shutdown.", ack_entry_queue.len());
+ info!("need to process remaining {} entries on
shutdown.", ack_entry_queue.len());
processor.flush_ack_entry_queue(&mut
ack_entry_queue).await;
break;
}
@@ -913,10 +850,7 @@ impl AckEntryProcessor {
if result.is_ok() {
ack_entry_queue.pop_front();
} else {
- error!(
- self.logger,
- "ack message failed: {:?}, will deliver later.", result
- );
+ error!("ack message failed: {:?}, will deliver later.",
result);
ack_entry_item.inc_attempt();
}
}
@@ -965,15 +899,12 @@ impl AckEntryProcessor {
shutdown_token.cancel();
}
if let Some(task_tracker) = self.task_tracker.take() {
- info!(self.logger, "waiting for task to complete");
+ info!("waiting for task to complete");
task_tracker.close();
task_tracker.wait().await;
- info!(self.logger, "task completed");
+ info!("task completed");
}
- info!(
- self.logger,
- "The ack entry processor shuts down completely."
- );
+ info!("The ack entry processor shuts down completely.");
}
}
@@ -982,7 +913,6 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{str::FromStr, vec};
- use log::terminal_logger;
use pb::{
AckMessageResponse, Address, Broker, ChangeInvisibleDurationResponse,
Code,
ForwardMessageToDeadLetterQueueResponse, QueryRouteResponse,
ReceiveMessageResponse,
@@ -1021,7 +951,7 @@ mod tests {
option2.set_consumer_group("test");
option2.subscribe("test_topic", FilterExpression::new(FilterType::Tag,
"*"));
let context = Client::new_context();
- context.expect().returning(|_, _, _| Ok(Client::default()));
+ context.expect().returning(|_, _| Ok(Client::default()));
let result3 = PushConsumer::new(
ClientOption::default(),
option2,
@@ -1061,12 +991,11 @@ mod tests {
});
client.expect_get_route_manager().returning(|| {
TopicRouteManager::new(
- terminal_logger(),
"".to_string(),
Endpoints::from_url("http://localhost:8081").unwrap(),
)
});
- context.expect().return_once(|_, _, _| Ok(client));
+ context.expect().return_once(|_, _| Ok(client));
let mut push_consumer_option = PushConsumerOption::default();
push_consumer_option.set_consumer_group("test_group");
push_consumer_option.subscribe("test_topic",
FilterExpression::new(FilterType::Tag, "*"));
@@ -1084,7 +1013,6 @@ mod tests {
#[tokio::test]
async fn test_process_assignments_invalid_assignment() -> Result<(),
ClientError> {
let rpc_client = Session::default();
- let logger = terminal_logger();
let option = &PushConsumerOption::default();
let message_listener: Arc<MessageListener> = Arc::new(Box::new(|_|
ConsumeResult::SUCCESS));
let mut actor_table: HashMap<MessageQueue, MessageQueueActor> =
HashMap::new();
@@ -1100,13 +1028,12 @@ mod tests {
}),
}];
let context = MockMessageQueueActor::new_context();
- context.expect().returning(|_, _, _, _, _, _| {
+ context.expect().returning(|_, _, _, _, _| {
let mut actor = MockMessageQueueActor::default();
actor.expect_start().returning(|| Ok(()));
return actor;
});
PushConsumer::process_assignments(
- logger,
&rpc_client,
option,
message_listener,
@@ -1121,7 +1048,6 @@ mod tests {
#[tokio::test]
async fn test_process_assignments() -> Result<(), ClientError> {
- let logger = terminal_logger();
let option = &PushConsumerOption::default();
let message_listener: Arc<MessageListener> = Arc::new(Box::new(|_|
ConsumeResult::SUCCESS));
@@ -1163,7 +1089,6 @@ mod tests {
}),
}];
PushConsumer::process_assignments(
- logger,
&rpc_client,
option,
message_listener,
@@ -1178,7 +1103,6 @@ mod tests {
#[tokio::test]
async fn test_process_assignments_two_queues() -> Result<(), ClientError> {
- let logger = terminal_logger();
let option = &PushConsumerOption::default();
let message_listener: Arc<MessageListener> = Arc::new(Box::new(|_|
ConsumeResult::SUCCESS));
let mut actor_table: HashMap<MessageQueue, MessageQueueActor> =
HashMap::new();
@@ -1242,7 +1166,6 @@ mod tests {
mock
});
PushConsumer::process_assignments(
- logger.clone(),
&session,
option,
message_listener,
@@ -1274,7 +1197,6 @@ mod tests {
}),
}];
PushConsumer::process_assignments(
- logger,
&session,
option,
Arc::new(Box::new(|_| ConsumeResult::SUCCESS)),
@@ -1365,7 +1287,6 @@ mod tests {
mock
});
let mut actor = MessageQueueActor::new(
- terminal_logger(),
session,
message_queue,
option,
@@ -1399,7 +1320,6 @@ mod tests {
mock
});
let mut actor = MessageQueueActor::new(
- terminal_logger(),
session,
message_queue,
option,
@@ -1437,8 +1357,7 @@ mod tests {
name: "test_topic".to_string(),
resource_namespace: "".to_string(),
};
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), rpc_client,
consumer_group, topic);
+ let mut ack_processor = AckEntryProcessor::new(rpc_client,
consumer_group, topic);
let result = ack_processor.start().await;
assert!(result.is_ok());
@@ -1487,8 +1406,7 @@ mod tests {
name: "test_topic".to_string(),
resource_namespace: "".to_string(),
};
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), rpc_client,
consumer_group, topic);
+ let mut ack_processor = AckEntryProcessor::new(rpc_client,
consumer_group, topic);
let result = ack_processor.start().await;
assert!(result.is_ok());
@@ -1534,8 +1452,7 @@ mod tests {
name: "test_topic".to_string(),
resource_namespace: "".to_string(),
};
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), rpc_client,
consumer_group, topic);
+ let mut ack_processor = AckEntryProcessor::new(rpc_client,
consumer_group, topic);
let result = ack_processor.start().await;
assert!(result.is_ok());
@@ -1590,8 +1507,7 @@ mod tests {
name: "test_topic".to_string(),
resource_namespace: "".to_string(),
};
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), rpc_client,
consumer_group, topic);
+ let mut ack_processor = AckEntryProcessor::new(rpc_client,
consumer_group, topic);
let result = ack_processor.start().await;
assert!(result.is_ok());
@@ -1637,8 +1553,7 @@ mod tests {
name: "test_topic".to_string(),
resource_namespace: "".to_string(),
};
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), rpc_client,
consumer_group, topic);
+ let mut ack_processor = AckEntryProcessor::new(rpc_client,
consumer_group, topic);
let result = ack_processor.start().await;
assert!(result.is_ok());
@@ -1692,8 +1607,7 @@ mod tests {
name: "test_topic".to_string(),
resource_namespace: "".to_string(),
};
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), rpc_client,
consumer_group, topic);
+ let mut ack_processor = AckEntryProcessor::new(rpc_client,
consumer_group, topic);
let result = ack_processor.start().await;
assert!(result.is_ok());
@@ -1721,11 +1635,8 @@ mod tests {
content: Some(Content::Message(new_message())),
}])
});
- let consumer_worker = StandardConsumerWorker::new(
- terminal_logger(),
- rpc_client,
- Arc::new(Box::new(|_| ConsumeResult::SUCCESS)),
- );
+ let consumer_worker =
+ StandardConsumerWorker::new(rpc_client, Arc::new(Box::new(|_|
ConsumeResult::SUCCESS)));
let consumer_group = Resource {
name: "test_group".to_string(),
resource_namespace: "".to_string(),
@@ -1757,8 +1668,7 @@ mod tests {
entries: vec![],
})
});
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), session, consumer_group,
topic);
+ let mut ack_processor = AckEntryProcessor::new(session,
consumer_group, topic);
let is_start_ok = ack_processor.start().await;
assert!(is_start_ok.is_ok());
let result = ConsumerWorker::Standard(consumer_worker)
@@ -1780,11 +1690,8 @@ mod tests {
content: Some(Content::Message(new_message())),
}])
});
- let consumer_worker = StandardConsumerWorker::new(
- terminal_logger(),
- rpc_client,
- Arc::new(Box::new(|_| ConsumeResult::FAILURE)),
- );
+ let consumer_worker =
+ StandardConsumerWorker::new(rpc_client, Arc::new(Box::new(|_|
ConsumeResult::FAILURE)));
let consumer_group = Resource {
name: "test_group".to_string(),
resource_namespace: "".to_string(),
@@ -1820,8 +1727,7 @@ mod tests {
receipt_handle: "".to_string(),
})
});
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), session, consumer_group,
topic);
+ let mut ack_processor = AckEntryProcessor::new(session,
consumer_group, topic);
let is_start_ok = ack_processor.start().await;
assert!(is_start_ok.is_ok());
let result = ConsumerWorker::Standard(consumer_worker)
@@ -1843,11 +1749,8 @@ mod tests {
content: Some(Content::Message(new_message())),
}])
});
- let consumer_worker = FifoConsumerWorker::new(
- terminal_logger(),
- rpc_client,
- Arc::new(Box::new(|_| ConsumeResult::SUCCESS)),
- );
+ let consumer_worker =
+ FifoConsumerWorker::new(rpc_client, Arc::new(Box::new(|_|
ConsumeResult::SUCCESS)));
let consumer_group = Resource {
name: "test_group".to_string(),
resource_namespace: "".to_string(),
@@ -1877,8 +1780,7 @@ mod tests {
entries: vec![],
})
});
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), session, consumer_group,
topic);
+ let mut ack_processor = AckEntryProcessor::new(session,
consumer_group, topic);
let result = ConsumerWorker::Fifo(consumer_worker)
.receive_messages(
&new_message_queue(),
@@ -1901,7 +1803,6 @@ mod tests {
let receive_call_count = Arc::new(AtomicUsize::new(0));
let outer_call_counter = Arc::clone(&receive_call_count);
let consumer_worker = FifoConsumerWorker::new(
- terminal_logger(),
rpc_client,
Arc::new(Box::new(move |_| {
receive_call_count.fetch_add(1, Ordering::Relaxed);
@@ -1940,8 +1841,7 @@ mod tests {
}),
})
});
- let mut ack_processor =
- AckEntryProcessor::new(terminal_logger(), session, consumer_group,
topic);
+ let mut ack_processor = AckEntryProcessor::new(session,
consumer_group, topic);
let result = ConsumerWorker::Fifo(consumer_worker)
.receive_messages(
&new_message_queue(),
diff --git a/rust/src/session.rs b/rust/src/session.rs
index ebd0687b..f9a08154 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -18,7 +18,6 @@
use async_trait::async_trait;
use mockall::{automock, mock};
use ring::hmac;
-use slog::{debug, error, info, o, Logger};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::{mpsc, oneshot};
@@ -42,6 +41,7 @@ use crate::pb::{
};
use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION};
use crate::{error::ClientError,
pb::messaging_service_client::MessagingServiceClient};
+use tracing::{debug, error, info};
const OPERATION_START: &str = "session.start";
const OPERATION_UPDATE_SETTINGS: &str = "session.update_settings";
@@ -104,7 +104,6 @@ pub(crate) trait RPCClient {
#[derive(Debug)]
pub(crate) struct Session {
- logger: Logger,
client_id: String,
option: ClientOption,
endpoints: Endpoints,
@@ -122,7 +121,6 @@ impl Session {
pub(crate) fn shadow_session(&self) -> Self {
Session {
- logger: self.logger.clone(),
client_id: self.client_id.clone(),
option: self.option.clone(),
endpoints: self.endpoints.clone(),
@@ -134,7 +132,6 @@ impl Session {
}
pub(crate) async fn new(
- logger: &Logger,
endpoints: &Endpoints,
client_id: String,
option: &ClientOption,
@@ -171,14 +168,12 @@ impl Session {
let stub = MessagingServiceClient::new(channel);
- let logger = logger.new(o!("peer" => peer.clone()));
- info!(logger, "create session success");
+ info!("create session success");
Ok(Session {
- logger,
+ client_id,
option: option.clone(),
endpoints: endpoints.clone(),
- client_id,
stub,
telemetry_tx: None,
telemetry_command_tx: None,
@@ -293,11 +288,8 @@ impl Session {
self.establish_telemetry_stream(settings).await?;
- debug!(
- self.logger,
- "telemetry_command_tx: {:?}", self.telemetry_command_tx
- );
- info!(self.logger, "starting client success");
+ debug!("telemetry_command_tx: {:?}", self.telemetry_command_tx);
+ info!("starting client success");
Ok(())
}
@@ -308,10 +300,7 @@ impl Session {
if let Some(old_shutdown_tx) = self.shutdown_tx.take() {
// a `send` error means the receiver is already gone, which is
fine.
let _ = old_shutdown_tx.send(());
- info!(
- self.logger,
- "sent shutdown signal to the previous telemetry stream."
- );
+ info!("sent shutdown signal to the previous telemetry stream.");
}
let (tx, rx) = mpsc::channel(16);
@@ -340,7 +329,6 @@ impl Session {
self.shutdown_tx = Some(shutdown_tx);
self.telemetry_tx = Some(tx);
- let logger = self.logger.clone();
let telemetry_command_tx =
self.telemetry_command_tx.as_ref().unwrap().clone();
tokio::spawn(async move {
let mut stream = response.into_inner();
@@ -349,30 +337,30 @@ impl Session {
message = stream.message() => {
match message {
Ok(Some(item)) => {
- debug!(logger, "receive telemetry command:
{:?}", item);
+ debug!("receive telemetry command: {:?}",
item);
if let Some(command) = item.command {
_ =
telemetry_command_tx.send(command).await;
}
}
Ok(None) => {
- info!(logger, "telemetry command stream closed
by server");
+ info!("telemetry command stream closed by
server");
break;
}
Err(e) => {
- error!(logger, "telemetry response error:
{:?}", e);
+ error!("telemetry response error: {:?}", e);
break;
}
}
}
_ = &mut shutdown_rx => {
- info!(logger, "receive shutdown signal, stop dealing
with telemetry command");
+ info!("receive shutdown signal, stop dealing with
telemetry command");
break;
}
}
}
// telemetry tx will be dropped here
});
- debug!(self.logger, "start session success");
+ debug!("start session success");
Ok(())
}
@@ -392,10 +380,7 @@ impl Session {
settings: TelemetryCommand,
) -> Result<(), ClientError> {
if self.is_started() {
- debug!(
- self.logger,
- "session is already started: {:?}", self.telemetry_tx
- );
+ debug!("session is already started: {:?}", self.telemetry_tx);
if let Some(tx) = self.telemetry_tx.as_ref() {
tx.send(settings).await.map_err(|e| {
ClientError::new(
@@ -408,10 +393,9 @@ impl Session {
}
Ok(())
} else {
- debug!(self.logger, "session is closed: {:?}", self.telemetry_tx);
+ debug!("session is closed: {:?}", self.telemetry_tx);
self.establish_telemetry_stream(settings).await?;
debug!(
- self.logger,
"session is established, closed: {:?}",
self.telemetry_tx.as_ref().unwrap().is_closed()
);
@@ -636,7 +620,6 @@ mock! {
pub(crate) Session {
pub(crate) fn shadow_session(&self) -> Self;
pub(crate) async fn new(
- logger: &Logger,
endpoints: &Endpoints,
client_id: String,
option: &ClientOption,
@@ -703,14 +686,8 @@ mock! {
#[cfg(test)]
mod tests {
- use mockall::predicate::always;
- use slog::debug;
use wiremock_grpc::generate;
- use crate::conf::ProducerOption;
- use crate::log::terminal_logger;
- use crate::util::build_producer_settings;
-
use super::*;
generate!("apache.rocketmq.v2", RocketMQMockServer);
@@ -718,31 +695,25 @@ mod tests {
#[tokio::test]
async fn session_new() {
let server = RocketMQMockServer::start_default().await;
- let logger = terminal_logger();
- let mut client_option = ClientOption::default();
- client_option.set_enable_tls(false);
let session = Session::new(
- &logger,
&Endpoints::from_url(&format!("localhost:{}",
server.address().port())).unwrap(),
"test_client".to_string(),
- &client_option,
+ &ClientOption::default(),
)
.await;
- debug!(logger, "session: {:?}", session);
+ debug!("session: {:?}", session);
assert!(session.is_ok());
}
#[tokio::test]
async fn session_new_multi_addr() {
- let logger = terminal_logger();
let session = Session::new(
- &logger,
&Endpoints::from_url("127.0.0.1:8080,127.0.0.1:8081").unwrap(),
"test_client".to_string(),
&ClientOption::default(),
)
.await;
- debug!(logger, "session: {:?}", session);
+ debug!("session: {:?}", session);
assert!(session.is_ok());
}
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index cc15949d..c78bed81 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -18,9 +18,9 @@
use std::time::Duration;
use mockall_double::double;
-use slog::{info, warn, Logger};
use tokio::select;
use tokio::sync::{mpsc, oneshot};
+use tracing::{info, warn};
#[double]
use crate::client::Client;
@@ -31,7 +31,6 @@ use crate::model::message::{AckMessageEntry, MessageView};
use crate::util::{
build_endpoints_by_message_queue, build_simple_consumer_settings,
select_message_queue,
};
-use crate::{log, pb};
/// [`SimpleConsumer`] is a lightweight consumer to consume messages from
RocketMQ proxy.
///
@@ -45,7 +44,6 @@ use crate::{log, pb};
#[derive(Debug)]
pub struct SimpleConsumer {
option: SimpleConsumerOption,
- logger: Logger,
client: Client,
shutdown_tx: Option<oneshot::Sender<()>>,
}
@@ -74,15 +72,9 @@ impl SimpleConsumer {
namespace: option.namespace().to_string(),
..client_option
};
- let logger = log::logger(option.logging_format());
- let client = Client::new(
- &logger,
- client_option,
- build_simple_consumer_settings(&option),
- )?;
+ let client = Client::new(client_option,
build_simple_consumer_settings(&option))?;
Ok(SimpleConsumer {
option,
- logger,
client,
shutdown_tx: None,
})
@@ -106,12 +98,11 @@ impl SimpleConsumer {
}
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
self.shutdown_tx = Some(shutdown_tx);
- let logger = self.logger.clone();
tokio::spawn(async move {
loop {
select! {
command = telemetry_command_rx.recv() => {
- warn!(logger, "command {:?} cannot be handled in
simple consumer.", command);
+ warn!("command {:?} cannot be handled in simple
consumer.", command);
}
_ = &mut shutdown_rx => {
@@ -121,7 +112,6 @@ impl SimpleConsumer {
}
});
info!(
- self.logger,
"start simple consumer success, client_id: {}",
self.client.client_id()
);
@@ -174,7 +164,7 @@ impl SimpleConsumer {
.receive_message(
&endpoints,
message_queue,
- pb::FilterExpression {
+ crate::pb::FilterExpression {
r#type: expression.filter_type() as i32,
expression: expression.expression().to_string(),
},
@@ -221,10 +211,9 @@ impl SimpleConsumer {
mod tests {
use std::sync::Arc;
- use crate::log::terminal_logger;
use crate::model::common::{FilterType, Route};
use crate::pb::{
- AckMessageResultEntry, Broker, Message, MessageQueue, Resource,
SystemProperties,
+ AckMessageResultEntry, Broker, Endpoints, Message, MessageQueue,
Resource, SystemProperties,
};
use super::*;
@@ -234,7 +223,7 @@ mod tests {
let _m = crate::client::tests::MTX.lock();
let ctx = Client::new_context();
- ctx.expect().return_once(|_, _, _| {
+ ctx.expect().return_once(|_, _| {
let mut client = Client::default();
client.expect_topic_route().returning(|_, _| {
Ok(Arc::new(Route {
@@ -273,7 +262,7 @@ mod tests {
broker: Some(Broker {
name: "".to_string(),
id: 0,
- endpoints: Some(pb::Endpoints {
+ endpoints: Some(Endpoints {
scheme: 0,
addresses: vec![],
}),
@@ -297,7 +286,6 @@ mod tests {
.returning(|_: &MessageView| Ok(AckMessageResultEntry::default()));
let simple_consumer = SimpleConsumer {
option: SimpleConsumerOption::default(),
- logger: terminal_logger(),
client,
shutdown_tx: None,
};