This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 759aa42d [ISSUE #1040] [Rust] Migrate from slog to tracing logging 
framework
759aa42d is described below

commit 759aa42d38052210b1465f6f997136d32c106788
Author: Adam Basfop Cavendish <[email protected]>
AuthorDate: Wed Jul 23 15:52:57 2025 +0800

    [ISSUE #1040] [Rust] Migrate from slog to tracing logging framework
    
    - Replace slog dependency with tracing for improved logging capabilities
    - Remove custom log.rs module and conf.rs logging configuration
    - Update all modules (client, producer, consumers, session) to use tracing
    - Simplify logging setup and improve observability
---
 rust/Cargo.toml                       |  12 +-
 rust/examples/delay_producer.rs       |   2 +
 rust/examples/fifo_producer.rs        |   2 +
 rust/examples/producer.rs             |   2 +
 rust/examples/push_consumer.rs        |   2 +
 rust/examples/simple_consumer.rs      |   2 +
 rust/examples/transaction_producer.rs |   2 +
 rust/src/client.rs                    |  80 ++++---------
 rust/src/conf.rs                      |  43 -------
 rust/src/lib.rs                       |   2 -
 rust/src/log.rs                       |  55 ---------
 rust/src/producer.rs                  |  28 ++---
 rust/src/push_consumer.rs             | 208 +++++++++-------------------------
 rust/src/session.rs                   |  61 +++-------
 rust/src/simple_consumer.rs           |  26 ++---
 15 files changed, 125 insertions(+), 402 deletions(-)

diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 692ab3f5..e55a0bba 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -46,10 +46,7 @@ parking_lot = "0.12"
 hostname = "0.3.1"
 os_info = "3"
 
-slog = { version = "2.7.0", features = ["max_level_trace", 
"release_max_level_info"] }
-slog-term = "2.9.0"
-slog-async = "2.7.0"
-slog-json = "2.6.1"
+tracing = "0.1.41"
 
 opentelemetry = { version = "0.19.0", features = ["metrics", "rt-tokio"] }
 opentelemetry-otlp = { version = "0.12.0", features = ["metrics", 
"grpc-tonic"] }
@@ -58,7 +55,7 @@ minitrace = "0.4"
 byteorder = "1"
 mac_address = "1.1.4"
 hex = "0.4.3"
-time = { version = "0.3", features = ["local-offset"] }
+time = { version = "0.3", features = ["formatting", "local-offset"] }
 once_cell = "1.18.0"
 
 mockall = "0.11.4"
@@ -75,9 +72,10 @@ version_check = "0.9.4"
 regex = "1.7.3"
 
 [dev-dependencies]
-wiremock-grpc = "0.0.3-alpha2"
-futures = "0.3"
 awaitility = "0.3.0"
+futures = "0.3"
+tracing-subscriber = "0.3.19"
+wiremock-grpc = "0.0.3-alpha2"
 
 [features]
 default = ["example_ack"]
diff --git a/rust/examples/delay_producer.rs b/rust/examples/delay_producer.rs
index cca3baea..87c9b05b 100644
--- a/rust/examples/delay_producer.rs
+++ b/rust/examples/delay_producer.rs
@@ -23,6 +23,8 @@ use rocketmq::Producer;
 
 #[tokio::main]
 async fn main() {
+    tracing_subscriber::fmt::init();
+
     // It's recommended to specify the topics that applications will publish 
messages to
     // because the producer will prefetch topic routes for them on start and 
fail fast in case they do not exist
     let mut producer_option = ProducerOption::default();
diff --git a/rust/examples/fifo_producer.rs b/rust/examples/fifo_producer.rs
index 211ae683..5f0b5078 100644
--- a/rust/examples/fifo_producer.rs
+++ b/rust/examples/fifo_producer.rs
@@ -20,6 +20,8 @@ use rocketmq::Producer;
 
 #[tokio::main]
 async fn main() {
+    tracing_subscriber::fmt::init();
+
     // It's recommended to specify the topics that applications will publish 
messages to
     // because the producer will prefetch topic routes for them on start and 
fail fast in case they do not exist
     let mut producer_option = ProducerOption::default();
diff --git a/rust/examples/producer.rs b/rust/examples/producer.rs
index 7179c9d2..e92c10be 100644
--- a/rust/examples/producer.rs
+++ b/rust/examples/producer.rs
@@ -20,6 +20,8 @@ use rocketmq::Producer;
 
 #[tokio::main]
 async fn main() {
+    tracing_subscriber::fmt::init();
+
     // It's recommended to specify the topics that applications will publish 
messages to
     // because the producer will prefetch topic routes for them on start and 
fail fast in case they do not exist
     let mut producer_option = ProducerOption::default();
diff --git a/rust/examples/push_consumer.rs b/rust/examples/push_consumer.rs
index 69a6a3ac..3de2691c 100644
--- a/rust/examples/push_consumer.rs
+++ b/rust/examples/push_consumer.rs
@@ -24,6 +24,8 @@ use tokio::time;
 
 #[tokio::main]
 async fn main() {
+    tracing_subscriber::fmt::init();
+
     let mut client_option = ClientOption::default();
     client_option.set_access_url("localhost:8081");
     client_option.set_enable_tls(false);
diff --git a/rust/examples/simple_consumer.rs b/rust/examples/simple_consumer.rs
index d9bb06da..8079432f 100644
--- a/rust/examples/simple_consumer.rs
+++ b/rust/examples/simple_consumer.rs
@@ -23,6 +23,8 @@ use rocketmq::SimpleConsumer;
 
 #[tokio::main]
 async fn main() {
+    tracing_subscriber::fmt::init();
+
     // It's recommended to specify the topics that applications will publish 
messages to
     // because the simple consumer will prefetch topic routes for them on 
start and fail fast in case they do not exist
     let mut consumer_option = SimpleConsumerOption::default();
diff --git a/rust/examples/transaction_producer.rs 
b/rust/examples/transaction_producer.rs
index 4bc621fa..4d8c88d3 100644
--- a/rust/examples/transaction_producer.rs
+++ b/rust/examples/transaction_producer.rs
@@ -27,6 +27,8 @@ static MESSAGE_ID_SET: Lazy<Mutex<HashSet<String>>> = 
Lazy::new(|| Mutex::new(Ha
 
 #[tokio::main]
 async fn main() {
+    tracing_subscriber::fmt::init();
+
     // It's recommended to specify the topics that applications will publish 
messages to
     // because the producer will prefetch topic routes for them on start and 
fail fast in case they do not exist
     let mut producer_option = ProducerOption::default();
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 3b286fc4..39686821 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -24,10 +24,10 @@ use mockall_double::double;
 use once_cell::sync::Lazy;
 use parking_lot::Mutex;
 use prost_types::Duration;
-use slog::{debug, error, info, o, warn, Logger};
 use tokio::select;
 use tokio::sync::{mpsc, oneshot};
 use tokio::time::Instant;
+use tracing::{debug, error, info, warn};
 
 use crate::conf::ClientOption;
 use crate::error::{ClientError, ErrorKind};
@@ -48,7 +48,6 @@ use crate::util::{handle_response_status, 
select_message_queue};
 
 #[derive(Debug)]
 pub(crate) struct Client {
-    logger: Logger,
     option: ClientOption,
     session_manager: Arc<SessionManager>,
     route_manager: TopicRouteManager,
@@ -62,7 +61,6 @@ pub(crate) struct Client {
 #[derive(Debug, Clone)]
 pub(crate) struct TopicRouteManager {
     route_table: Arc<Mutex<HashMap<String /* topic */, RouteStatus>>>,
-    logger: Logger,
     access_endpoints: Endpoints,
     namespace: String,
 }
@@ -82,21 +80,16 @@ const OPERATION_ACK_MESSAGE: &str = "client.ack_message";
 #[automock]
 impl Client {
     pub(crate) fn new(
-        logger: &Logger,
         option: ClientOption,
         settings: TelemetryCommand,
     ) -> Result<Self, ClientError> {
         let id = Self::generate_client_id();
         let endpoints = Endpoints::from_url(option.access_url())
             .map_err(|e| e.with_operation(OPERATION_CLIENT_NEW))?;
-        let session_manager = SessionManager::new(logger, id.clone(), &option);
-        let route_manager = TopicRouteManager::new(
-            logger.clone(),
-            option.get_namespace().to_string(),
-            endpoints.clone(),
-        );
+        let session_manager = SessionManager::new(id.clone(), &option);
+        let route_manager =
+            TopicRouteManager::new(option.get_namespace().to_string(), 
endpoints.clone());
         Ok(Client {
-            logger: logger.new(o!("component" => "client")),
             option,
             session_manager: Arc::new(session_manager),
             route_manager,
@@ -124,7 +117,6 @@ impl Client {
         &mut self,
         telemetry_command_tx: mpsc::Sender<pb::telemetry_command::Command>,
     ) -> Result<(), ClientError> {
-        let logger = self.logger.clone();
         let session_manager = self.session_manager.clone();
 
         let group = self.option.group.clone();
@@ -157,7 +149,6 @@ impl Client {
                         let sessions = 
session_manager.get_all_sessions().await;
                         if sessions.is_err() {
                             error!(
-                                logger,
                                 "send heartbeat failed: failed to get 
sessions: {}",
                                 sessions.unwrap_err()
                             );
@@ -169,7 +160,6 @@ impl Client {
                             let response = Self::heart_beat_inner(session, 
&group, &namespace, &client_type).await;
                             if response.is_err() {
                                 error!(
-                                    logger,
                                     "send heartbeat failed: failed to send 
heartbeat rpc: {}",
                                     response.unwrap_err()
                                 );
@@ -179,45 +169,44 @@ impl Client {
                                 
handle_response_status(response.unwrap().status, OPERATION_HEARTBEAT);
                             if result.is_err() {
                                 error!(
-                                    logger,
                                     "send heartbeat failed: server return 
error: {}",
                                     result.unwrap_err()
                                 );
                                 continue;
                             }
-                            debug!(logger,"send heartbeat to server success, 
peer={}",peer);
+                            debug!("send heartbeat to server success, 
peer={}",peer);
                         }
                     },
                     _ = sync_settings_interval.tick() => {
                         let sessions = 
session_manager.get_all_sessions().await;
                         if sessions.is_err() {
-                            error!(logger, "sync settings failed: failed to 
get sessions: {}", sessions.unwrap_err());
+                            error!("sync settings failed: failed to get 
sessions: {}", sessions.unwrap_err());
                             continue;
                         }
                         for mut session in sessions.unwrap() {
                             let peer = session.peer().to_string();
                             let result = 
session.update_settings(settings.clone()).await;
                             if result.is_err() {
-                                error!(logger, "sync settings failed: failed 
to call rpc: {}", result.unwrap_err());
+                                error!("sync settings failed: failed to call 
rpc: {}", result.unwrap_err());
                                 continue;
                             }
-                            debug!(logger, "sync settings success, peer = {}", 
peer);
+                            debug!("sync settings success, peer = {}", peer);
                         }
 
                     },
                     _ = sync_route_timer.tick() => {
                         let result = route_manager.sync_route_data(&mut 
rpc_client).await;
                         if result.is_err() {
-                            error!(logger, "sync route failed: {}", 
result.unwrap_err());
+                            error!("sync route failed: {}", 
result.unwrap_err());
                         }
                     },
                     _ = &mut shutdown_rx => {
-                        info!(logger, "receive shutdown signal, stop heartbeat 
and telemetry tasks.");
+                        info!("receive shutdown signal, stop heartbeat and 
telemetry tasks.");
                         break;
                     }
                 }
             }
-            info!(logger, "heartbeat and telemetry task were stopped");
+            info!("heartbeat and telemetry task were stopped");
         });
         Ok(())
     }
@@ -516,9 +505,8 @@ impl Client {
 }
 
 impl TopicRouteManager {
-    pub(crate) fn new(logger: Logger, namespace: String, access_endpoints: 
Endpoints) -> Self {
+    pub(crate) fn new(namespace: String, access_endpoints: Endpoints) -> Self {
         Self {
-            logger,
             namespace,
             access_endpoints,
             route_table: Arc::new(Mutex::new(HashMap::new())),
@@ -533,7 +521,7 @@ impl TopicRouteManager {
         {
             topics = self.route_table.lock().keys().cloned().collect();
         }
-        debug!(self.logger, "sync topic route of topics {:?}", topics);
+        debug!("sync topic route of topics {:?}", topics);
         for topic in topics {
             self.topic_route_inner(rpc_client, &topic).await?;
         }
@@ -545,7 +533,7 @@ impl TopicRouteManager {
         rpc_client: &mut T,
         topics: Vec<String>,
     ) -> Result<(), ClientError> {
-        debug!(self.logger, "sync topic route of topics {:?}.", topics);
+        debug!("sync topic route of topics {:?}.", topics);
         for topic in topics {
             self.topic_route_inner(rpc_client, topic.as_str()).await?;
         }
@@ -557,7 +545,7 @@ impl TopicRouteManager {
         rpc_client: &mut T,
         topic: &str,
     ) -> Result<Arc<Route>, ClientError> {
-        debug!(self.logger, "query route for topic={}", topic);
+        debug!("query route for topic={}", topic);
         let rx = match self
             .route_table
             .lock()
@@ -604,10 +592,7 @@ impl TopicRouteManager {
 
         // send result to all waiters
         if let Ok(route) = result {
-            debug!(
-                self.logger,
-                "query route for topic={} success: route={:?}", topic, route
-            );
+            debug!("query route for topic={} success: route={:?}", topic, 
route);
             let route = Arc::new(route);
             let mut route_table_lock = self.route_table.lock();
 
@@ -620,7 +605,7 @@ impl TopicRouteManager {
 
             let prev =
                 route_table_lock.insert(topic.to_owned(), 
RouteStatus::Found(Arc::clone(&route)));
-            info!(self.logger, "update route for topic={}", topic);
+            info!("update route for topic={}", topic);
 
             if let Some(RouteStatus::Querying(Some(mut v))) = prev {
                 for item in v.drain(..) {
@@ -630,10 +615,7 @@ impl TopicRouteManager {
             Ok(route)
         } else {
             let err = result.unwrap_err();
-            warn!(
-                self.logger,
-                "query route for topic={} failed: error={}", topic, err
-            );
+            warn!("query route for topic={} failed: error={}", topic, err);
             let mut route_table_lock = self.route_table.lock();
             // keep the existing route if error occurs.
             if let Some(RouteStatus::Found(prev)) = 
route_table_lock.get(topic) {
@@ -704,7 +686,6 @@ impl TopicRouteManager {
 
 #[derive(Debug)]
 pub(crate) struct SessionManager {
-    logger: Logger,
     client_id: String,
     option: ClientOption,
     session_map: tokio::sync::Mutex<HashMap<String, Session>>,
@@ -712,11 +693,9 @@ pub(crate) struct SessionManager {
 
 #[automock]
 impl SessionManager {
-    pub(crate) fn new(logger: &Logger, client_id: String, option: 
&ClientOption) -> Self {
-        let logger = logger.new(o!("component" => "session"));
+    pub(crate) fn new(client_id: String, option: &ClientOption) -> Self {
         let session_map = tokio::sync::Mutex::new(HashMap::new());
         SessionManager {
-            logger,
             client_id,
             option: option.clone(),
             session_map,
@@ -734,13 +713,7 @@ impl SessionManager {
         if session_map.contains_key(&endpoint_url) {
             Ok(session_map.get(&endpoint_url).unwrap().shadow_session())
         } else {
-            let mut session = Session::new(
-                &self.logger,
-                endpoints,
-                self.client_id.clone(),
-                &self.option,
-            )
-            .await?;
+            let mut session = Session::new(endpoints, self.client_id.clone(), 
&self.option).await?;
             session.start(settings, telemetry_command_tx).await?;
             let shadow_session = session.shadow_session();
             session_map.insert(endpoint_url.clone(), session);
@@ -777,7 +750,6 @@ pub(crate) mod tests {
     use crate::client::Client;
     use crate::conf::ClientOption;
     use crate::error::{ClientError, ErrorKind};
-    use crate::log::terminal_logger;
     use crate::model::common::{ClientType, Route};
     use crate::pb::{
         AckMessageEntry, AckMessageResponse, ChangeInvisibleDurationResponse, 
Code,
@@ -794,7 +766,6 @@ pub(crate) mod tests {
 
     fn new_route_manager_for_test() -> TopicRouteManager {
         TopicRouteManager {
-            logger: terminal_logger(),
             route_table: Arc::new(Mutex::new(HashMap::new())),
             access_endpoints: 
Endpoints::from_url("http://localhost:8081";).unwrap(),
             namespace: "".to_string(),
@@ -803,7 +774,6 @@ pub(crate) mod tests {
 
     fn new_session_manager() -> SessionManager {
         SessionManager::new(
-            &terminal_logger(),
             Client::generate_client_id(),
             &ClientOption {
                 group: Some("group".to_string()),
@@ -814,7 +784,6 @@ pub(crate) mod tests {
 
     fn new_client_for_test() -> Client {
         Client {
-            logger: terminal_logger(),
             option: ClientOption {
                 group: Some("group".to_string()),
                 ..Default::default()
@@ -832,7 +801,6 @@ pub(crate) mod tests {
     fn new_client_with_session_manager(session_manager: SessionManager) -> 
Client {
         let (tx, _) = mpsc::channel(16);
         Client {
-            logger: terminal_logger(),
             option: ClientOption::default(),
             session_manager: Arc::new(session_manager),
             route_manager: new_route_manager_for_test(),
@@ -853,11 +821,7 @@ pub(crate) mod tests {
 
     #[test]
     fn client_new() -> Result<(), ClientError> {
-        Client::new(
-            &terminal_logger(),
-            ClientOption::default(),
-            TelemetryCommand::default(),
-        )?;
+        Client::new(ClientOption::default(), TelemetryCommand::default())?;
         Ok(())
     }
 
@@ -870,7 +834,7 @@ pub(crate) mod tests {
     #[tokio::test]
     async fn client_get_session() {
         let context = MockSession::new_context();
-        context.expect().returning(|_, _, _, _| {
+        context.expect().returning(|_, _, _| {
             let mut session = MockSession::default();
             session.expect_start().returning(|_, _| Ok(()));
             session
diff --git a/rust/src/conf.rs b/rust/src/conf.rs
index 79d92f60..ec61d1ca 100644
--- a/rust/src/conf.rs
+++ b/rust/src/conf.rs
@@ -123,19 +123,9 @@ impl ClientOption {
     }
 }
 
-/// Log format for output.
-#[derive(Debug, Clone, Eq, PartialEq)]
-pub enum LoggingFormat {
-    /// Print log in terminal
-    Terminal,
-    /// Print log in json file
-    Json,
-}
-
 /// The configuration of [`Producer`].
 #[derive(Debug, Clone)]
 pub struct ProducerOption {
-    logging_format: LoggingFormat,
     prefetch_route: bool,
     topics: Option<Vec<String>>,
     namespace: String,
@@ -146,7 +136,6 @@ pub struct ProducerOption {
 impl Default for ProducerOption {
     fn default() -> Self {
         ProducerOption {
-            logging_format: LoggingFormat::Terminal,
             prefetch_route: true,
             topics: None,
             namespace: "".to_string(),
@@ -157,15 +146,6 @@ impl Default for ProducerOption {
 }
 
 impl ProducerOption {
-    /// Get the logging format of producer
-    pub fn logging_format(&self) -> &LoggingFormat {
-        &self.logging_format
-    }
-    /// Set the logging format for producer
-    pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
-        self.logging_format = logging_format;
-    }
-
     /// Whether to prefetch route info
     pub fn prefetch_route(&self) -> &bool {
         &self.prefetch_route
@@ -209,7 +189,6 @@ impl ProducerOption {
 /// The configuration of [`SimpleConsumer`].
 #[derive(Debug, Clone)]
 pub struct SimpleConsumerOption {
-    logging_format: LoggingFormat,
     consumer_group: String,
     prefetch_route: bool,
     topics: Option<Vec<String>>,
@@ -221,7 +200,6 @@ pub struct SimpleConsumerOption {
 impl Default for SimpleConsumerOption {
     fn default() -> Self {
         SimpleConsumerOption {
-            logging_format: LoggingFormat::Terminal,
             consumer_group: "".to_string(),
             prefetch_route: true,
             topics: None,
@@ -233,15 +211,6 @@ impl Default for SimpleConsumerOption {
 }
 
 impl SimpleConsumerOption {
-    /// Set the logging format of simple consumer
-    pub fn logging_format(&self) -> &LoggingFormat {
-        &self.logging_format
-    }
-    /// set the logging format for simple consumer
-    pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
-        self.logging_format = logging_format;
-    }
-
     /// Get the consumer group of simple consumer
     pub fn consumer_group(&self) -> &str {
         &self.consumer_group
@@ -288,7 +257,6 @@ impl SimpleConsumerOption {
 
 #[derive(Debug, Clone)]
 pub struct PushConsumerOption {
-    logging_format: LoggingFormat,
     consumer_group: String,
     namespace: String,
     timeout: Duration,
@@ -302,7 +270,6 @@ pub struct PushConsumerOption {
 impl Default for PushConsumerOption {
     fn default() -> Self {
         Self {
-            logging_format: LoggingFormat::Terminal,
             consumer_group: "".to_string(),
             namespace: "".to_string(),
             timeout: Duration::from_secs(3),
@@ -364,14 +331,6 @@ impl PushConsumerOption {
         self.fifo = fifo;
     }
 
-    pub fn logging_format(&self) -> &LoggingFormat {
-        &self.logging_format
-    }
-
-    pub fn set_logging_format(&mut self, logging_format: LoggingFormat) {
-        self.logging_format = logging_format;
-    }
-
     pub fn batch_size(&self) -> i32 {
         self.batch_size
     }
@@ -537,7 +496,6 @@ mod tests {
     #[test]
     fn conf_producer_option() {
         let option = ProducerOption::default();
-        assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
         assert!(option.prefetch_route());
         assert!(option.validate_message_type());
     }
@@ -545,7 +503,6 @@ mod tests {
     #[test]
     fn conf_simple_consumer_option() {
         let option = SimpleConsumerOption::default();
-        assert_eq!(option.logging_format(), &LoggingFormat::Terminal);
         assert!(option.prefetch_route());
     }
 
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index dc7478a0..c12eba67 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -128,8 +128,6 @@ pub use simple_consumer::SimpleConsumer;
 #[allow(dead_code)]
 pub mod conf;
 pub mod error;
-#[allow(dead_code)]
-mod log;
 
 mod client;
 
diff --git a/rust/src/log.rs b/rust/src/log.rs
deleted file mode 100644
index db2f4736..00000000
--- a/rust/src/log.rs
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-use std::fs::OpenOptions;
-
-use slog::{o, Drain, Logger};
-use slog_async::OverflowStrategy;
-
-use crate::conf::LoggingFormat;
-
-pub(crate) fn terminal_logger() -> Logger {
-    let decorator = slog_term::TermDecorator::new().build();
-    let drain = slog_term::FullFormat::new(decorator)
-        .use_file_location()
-        .build()
-        .fuse();
-    let drain = slog_async::Async::new(drain)
-        .overflow_strategy(OverflowStrategy::Block)
-        .chan_size(1)
-        .build()
-        .fuse();
-    Logger::root(drain, o!())
-}
-
-fn json_logger(filepath: &str) -> Logger {
-    let file = OpenOptions::new()
-        .create(true)
-        .write(true)
-        .truncate(true)
-        .open(filepath)
-        .unwrap();
-    let decorator = slog_json::Json::default(file).fuse();
-    let drain = slog_async::Async::new(decorator).build().fuse();
-    Logger::root(drain, o!())
-}
-
-pub(crate) fn logger(logging_format: &LoggingFormat) -> Logger {
-    match logging_format {
-        LoggingFormat::Terminal => terminal_logger(),
-        LoggingFormat::Json => json_logger("logs/rocketmq_client.log"),
-    }
-}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index 94cd295d..8bce38aa 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -22,9 +22,9 @@ use std::time::{SystemTime, UNIX_EPOCH};
 use mockall_double::double;
 use parking_lot::RwLock;
 use prost_types::Timestamp;
-use slog::{error, info, warn, Logger};
 use tokio::select;
 use tokio::sync::{mpsc, oneshot};
+use tracing::{error, info, warn};
 
 #[double]
 use crate::client::Client;
@@ -35,6 +35,7 @@ use crate::model::message::{self, MessageTypeAware, 
MessageView};
 use crate::model::transaction::{
     Transaction, TransactionChecker, TransactionImpl, TransactionResolution,
 };
+use crate::pb;
 use crate::pb::settings::PubSub;
 use crate::pb::telemetry_command::Command::{RecoverOrphanedTransactionCommand, 
Settings};
 use crate::pb::{Encoding, EndTransactionRequest, Resource, SystemProperties, 
TransactionSource};
@@ -43,7 +44,6 @@ use crate::util::{
     build_endpoints_by_message_queue, build_producer_settings, 
handle_response_status,
     select_message_queue, select_message_queue_by_message_group, HOST_NAME,
 };
-use crate::{log, pb};
 
 /// [`Producer`] is the core struct, to which application developers should 
turn, when publishing messages to RocketMQ proxy.
 ///
@@ -53,7 +53,6 @@ use crate::{log, pb};
 /// [`Producer`] is `Send` and `Sync` by design, so that developers may get 
started easily.
 pub struct Producer {
     option: Arc<RwLock<ProducerOption>>,
-    logger: Logger,
     client: Client,
     transaction_checker: Option<Box<TransactionChecker>>,
     shutdown_tx: Option<oneshot::Sender<()>>,
@@ -84,11 +83,9 @@ impl Producer {
             namespace: option.namespace().to_string(),
             ..client_option
         };
-        let logger = log::logger(option.logging_format());
-        let client = Client::new(&logger, client_option, 
build_producer_settings(&option))?;
+        let client = Client::new(client_option, 
build_producer_settings(&option))?;
         Ok(Producer {
             option: Arc::new(RwLock::new(option)),
-            logger,
             client,
             transaction_checker: None,
             shutdown_tx: None,
@@ -112,11 +109,9 @@ impl Producer {
             namespace: option.namespace().to_string(),
             ..client_option
         };
-        let logger = log::logger(option.logging_format());
-        let client = Client::new(&logger, client_option, 
build_producer_settings(&option))?;
+        let client = Client::new(client_option, 
build_producer_settings(&option))?;
         Ok(Producer {
             option: Arc::new(RwLock::new(option)),
-            logger,
             client,
             transaction_checker: Some(transaction_checker),
             shutdown_tx: None,
@@ -156,7 +151,6 @@ impl Producer {
         self.shutdown_tx = Some(shutdown_tx);
         let rpc_client = self.client.get_session().await?;
         let endpoints = self.client.get_endpoints();
-        let logger = self.logger.clone();
         let producer_option = Arc::clone(&self.option);
         tokio::spawn(async move {
             loop {
@@ -171,16 +165,16 @@ impl Producer {
                                             &transaction_checker,
                                             endpoints.clone()).await;
                                     if let Err(error) = result {
-                                        error!(logger, "handle trannsaction 
command failed: {:?}", error);
+                                        error!("handle trannsaction command 
failed: {:?}", error);
                                     };
                                 }
                                 Settings(command) => {
                                     let option = &mut producer_option.write();
                                     Self::handle_settings_command(command, 
option);
-                                    info!(logger, "handle setting command 
success.");
+                                    info!("handle setting command success.");
                                 }
                                 _ => {
-                                    warn!(logger, "unimplemented command 
{:?}", command);
+                                    warn!("unimplemented command {:?}", 
command);
                                 }
                             }
                         }
@@ -192,7 +186,6 @@ impl Producer {
             }
         });
         info!(
-            self.logger,
             "start producer success, client_id: {}",
             self.client.client_id()
         );
@@ -473,7 +466,6 @@ mod tests {
     use std::sync::Arc;
 
     use crate::error::ErrorKind;
-    use crate::log::terminal_logger;
     use crate::model::common::Route;
     use crate::model::message::{MessageBuilder, MessageImpl, MessageType};
     use crate::model::transaction::TransactionResolution;
@@ -487,7 +479,6 @@ mod tests {
     fn new_producer_for_test() -> Producer {
         Producer {
             option: Default::default(),
-            logger: terminal_logger(),
             client: Client::default(),
             shutdown_tx: None,
             transaction_checker: None,
@@ -497,7 +488,6 @@ mod tests {
     fn new_transaction_producer_for_test() -> Producer {
         Producer {
             option: Default::default(),
-            logger: terminal_logger(),
             client: Client::default(),
             shutdown_tx: None,
             transaction_checker: Some(Box::new(|_, _| 
TransactionResolution::COMMIT)),
@@ -509,7 +499,7 @@ mod tests {
         let _m = crate::client::tests::MTX.lock();
 
         let ctx = Client::new_context();
-        ctx.expect().return_once(|_, _, _| {
+        ctx.expect().return_once(|_, _| {
             let mut client = Client::default();
             client.expect_topic_route().returning(|_, _| {
                 Ok(Arc::new(Route {
@@ -542,7 +532,7 @@ mod tests {
         let _m = crate::client::tests::MTX.lock();
 
         let ctx = Client::new_context();
-        ctx.expect().return_once(|_, _, _| {
+        ctx.expect().return_once(|_, _| {
             let mut client = Client::default();
             client.expect_topic_route().returning(|_, _| {
                 Ok(Arc::new(Route {
diff --git a/rust/src/push_consumer.rs b/rust/src/push_consumer.rs
index 3ccac5cd..d3f9a23a 100644
--- a/rust/src/push_consumer.rs
+++ b/rust/src/push_consumer.rs
@@ -18,8 +18,6 @@
 use mockall::automock;
 use mockall_double::double;
 use parking_lot::{Mutex, RwLock};
-use slog::Logger;
-use slog::{debug, error, info, warn};
 use std::collections::{HashMap, VecDeque};
 use std::sync::Arc;
 use std::time::Duration;
@@ -34,6 +32,7 @@ use crate::conf::{BackOffRetryPolicy, ClientOption, 
PushConsumerOption};
 use crate::error::{ClientError, ErrorKind};
 use crate::model::common::{ClientType, ConsumeResult, MessageQueue};
 use crate::model::message::{AckMessageEntry, MessageView};
+use crate::pb;
 use crate::pb::receive_message_response::Content;
 use crate::pb::{
     AckMessageRequest, Assignment, ChangeInvisibleDurationRequest,
@@ -46,7 +45,7 @@ use crate::session::Session;
 use crate::util::{
     build_endpoints_by_message_queue, build_push_consumer_settings, 
handle_response_status,
 };
-use crate::{log, pb};
+use tracing::{debug, error, info, warn};
 
 const OPERATION_NEW_PUSH_CONSUMER: &str = "push_consumer.new";
 const OPERATION_RECEIVE_MESSAGE: &str = "push_consumer.receive_message";
@@ -58,7 +57,6 @@ const OPERATION_FORWARD_TO_DEADLETTER_QUEUE: &str = 
"push_consumer.forward_to_de
 pub type MessageListener = Box<dyn Fn(&MessageView) -> ConsumeResult + Send + 
Sync>;
 
 pub struct PushConsumer {
-    logger: Logger,
     client: Client,
     message_listener: Arc<MessageListener>,
     option: Arc<RwLock<PushConsumerOption>>,
@@ -91,14 +89,8 @@ impl PushConsumer {
             group: Some(option.consumer_group().to_string()),
             ..client_option
         };
-        let logger = log::logger(option.logging_format());
-        let client = Client::new(
-            &logger,
-            client_option,
-            build_push_consumer_settings(&option),
-        )?;
+        let client = Client::new(client_option, 
build_push_consumer_settings(&option))?;
         Ok(Self {
-            logger,
             client,
             message_listener: Arc::new(message_listener),
             option: Arc::new(RwLock::new(option)),
@@ -125,7 +117,6 @@ impl PushConsumer {
         route_manager
             .sync_topic_routes(&mut rpc_client, topics)
             .await?;
-        let logger = self.logger.clone();
         let mut actor_table: HashMap<MessageQueue, MessageQueueActor> = 
HashMap::new();
 
         let message_listener = Arc::clone(&self.message_listener);
@@ -155,7 +146,7 @@ impl PushConsumer {
                             option_retry_policy = retry_policy.lock().clone();
                         }
                         if option_retry_policy.is_none() {
-                            warn!(logger, "retry policy is not set. skip 
scanning.");
+                            warn!("retry policy is not set. skip scanning.");
                             continue;
                         }
                         let retry_policy_inner = option_retry_policy.unwrap();
@@ -181,7 +172,7 @@ impl PushConsumer {
                                 let result = 
rpc_client.query_assignment(request).await;
                                 if let Ok(response) = result {
                                     if handle_response_status(response.status, 
OPERATION_START_PUSH_CONSUMER).is_ok() {
-                                            let result = 
Self::process_assignments(logger.clone(),
+                                            let result = 
Self::process_assignments(
                                                 &rpc_client,
                                                 &consumer_option,
                                                 Arc::clone(&message_listener),
@@ -190,20 +181,20 @@ impl PushConsumer {
                                                 retry_policy_inner.clone(),
                                             ).await;
                                             if result.is_err() {
-                                                error!(logger, "process 
assignments failed: {:?}", result.unwrap_err());
+                                                error!("process assignments 
failed: {:?}", result.unwrap_err());
                                             }
                                     } else {
-                                        error!(logger, "query assignment 
failed, no status in response.");
+                                        error!("query assignment failed, no 
status in response.");
                                     }
                                 } else {
-                                    error!(logger, "query assignment failed: 
{:?}", result.unwrap_err());
+                                    error!("query assignment failed: {:?}", 
result.unwrap_err());
                                 }
                             }
                         }
                     }
                     _ = shutdown_token.cancelled() => {
                         let entries = actor_table.drain();
-                        info!(logger, "shutdown {:?} actors", entries.len());
+                        info!("shutdown {:?} actors", entries.len());
                         for (_, actor) in entries {
                             let _ = actor.shutdown().await;
                         }
@@ -216,7 +207,6 @@ impl PushConsumer {
     }
 
     async fn process_assignments(
-        logger: Logger,
         rpc_client: &Session,
         option: &PushConsumerOption,
         message_listener: Arc<MessageListener>,
@@ -261,7 +251,6 @@ impl PushConsumer {
             }
             let option = option.clone();
             let mut actor = MessageQueueActor::new(
-                logger.clone(),
                 rpc_client.shadow_session(),
                 message_queue.clone(),
                 option,
@@ -290,7 +279,6 @@ impl PushConsumer {
 
     async fn receive_messages<T: RPCClient + 'static>(
         rpc_client: &mut T,
-        logger: &Logger,
         message_queue: &MessageQueue,
         option: &PushConsumerOption,
     ) -> Result<Vec<MessageView>, ClientError> {
@@ -326,10 +314,10 @@ impl PushConsumer {
                 let content = response.content.unwrap();
                 match content {
                     Content::Status(status) => {
-                        warn!(logger, "unhandled status message {:?}", status);
+                        warn!("unhandled status message {:?}", status);
                     }
                     Content::DeliveryTimestamp(_) => {
-                        warn!(logger, "unhandled delivery timestamp message");
+                        warn!("unhandled delivery timestamp message");
                     }
                     Content::Message(message) => {
                         messages.push(
@@ -424,7 +412,6 @@ impl DlqEntry {
 }
 
 struct MessageQueueActor {
-    logger: Logger,
     rpc_client: Session,
     message_queue: MessageQueue,
     option: PushConsumerOption,
@@ -437,7 +424,6 @@ struct MessageQueueActor {
 #[automock]
 impl MessageQueueActor {
     pub(crate) fn new(
-        logger: Logger,
         rpc_client: Session,
         message_queue: MessageQueue,
         option: PushConsumerOption,
@@ -445,7 +431,6 @@ impl MessageQueueActor {
         retry_policy: BackOffRetryPolicy,
     ) -> Self {
         Self {
-            logger,
             rpc_client,
             message_queue,
             option,
@@ -466,10 +451,7 @@ impl MessageQueueActor {
     }
 
     pub(crate) async fn start(&mut self) -> Result<(), ClientError> {
-        debug!(
-            self.logger,
-            "start a new queue actor {:?}", self.message_queue
-        );
+        debug!("start a new queue actor {:?}", self.message_queue);
         let shutdown_token = CancellationToken::new();
         self.shutdown_token = Some(shutdown_token.clone());
         let task_tracker = TaskTracker::new();
@@ -479,23 +461,18 @@ impl MessageQueueActor {
         if self.option.fifo() {
             consumer_worker_count = 1;
         }
-        info!(
-            self.logger,
-            "start consumer worker count: {}", consumer_worker_count
-        );
+        info!("start consumer worker count: {}", consumer_worker_count);
 
         for _ in 0..consumer_worker_count {
             // create consumer worker
             let mut consumer_worker: ConsumerWorker;
             if self.option.fifo() {
                 consumer_worker = ConsumerWorker::Fifo(FifoConsumerWorker::new(
-                    self.logger.clone(),
                     self.rpc_client.shadow_session(),
                     Arc::clone(&self.message_listener),
                 ));
             } else {
                 consumer_worker = 
ConsumerWorker::Standard(StandardConsumerWorker::new(
-                    self.logger.clone(),
                     self.rpc_client.shadow_session(),
                     Arc::clone(&self.message_listener),
                 ));
@@ -505,21 +482,19 @@ impl MessageQueueActor {
             let option = self.option.clone();
             let retry_policy = self.retry_policy.clone();
             let mut ack_processor = AckEntryProcessor::new(
-                self.logger.clone(),
                 self.rpc_client.shadow_session(),
                 self.option.get_consumer_group_resource(),
                 self.message_queue.topic.to_owned(),
             );
             ack_processor.start().await?;
             let shutdown_token = shutdown_token.clone();
-            let logger = self.logger.clone();
             task_tracker.spawn(async move {
                 loop {
                     select! {
                         _ = ticker.tick() => {
                             let result = 
consumer_worker.receive_messages(&message_queue, &option, &mut ack_processor, 
&retry_policy).await;
                             if result.is_err() {
-                                error!(logger, "receive messages error: {:?}", 
result.err());
+                                error!("receive messages error: {:?}", 
result.err());
                             }
                         }
                         _ = shutdown_token.cancelled() => {
@@ -534,7 +509,7 @@ impl MessageQueueActor {
     }
 
     pub(crate) async fn shutdown(mut self) -> Result<(), ClientError> {
-        debug!(self.logger, "shutdown queue actor {:?}", self.message_queue);
+        debug!("shutdown queue actor {:?}", self.message_queue);
         if let Some(shutdown_token) = self.shutdown_token.take() {
             shutdown_token.cancel();
         }
@@ -575,15 +550,13 @@ impl ConsumerWorker {
 }
 
 struct StandardConsumerWorker {
-    logger: Logger,
     rpc_client: Session,
     message_listener: Arc<MessageListener>,
 }
 
 impl StandardConsumerWorker {
-    fn new(logger: Logger, rpc_client: Session, message_listener: 
Arc<MessageListener>) -> Self {
+    fn new(rpc_client: Session, message_listener: Arc<MessageListener>) -> 
Self {
         Self {
-            logger,
             rpc_client,
             message_listener,
         }
@@ -596,13 +569,8 @@ impl StandardConsumerWorker {
         ack_processor: &mut AckEntryProcessor,
         retry_policy: &BackOffRetryPolicy,
     ) -> Result<(), ClientError> {
-        let messages = PushConsumer::receive_messages(
-            &mut self.rpc_client,
-            &self.logger,
-            message_queue,
-            option,
-        )
-        .await?;
+        let messages =
+            PushConsumer::receive_messages(&mut self.rpc_client, 
message_queue, option).await?;
         for message in messages {
             let consume_result = (self.message_listener)(&message);
             match consume_result {
@@ -623,15 +591,13 @@ impl StandardConsumerWorker {
 }
 
 struct FifoConsumerWorker {
-    logger: Logger,
     rpc_client: Session,
     message_listener: Arc<MessageListener>,
 }
 
 impl FifoConsumerWorker {
-    fn new(logger: Logger, rpc_client: Session, message_listener: 
Arc<MessageListener>) -> Self {
+    fn new(rpc_client: Session, message_listener: Arc<MessageListener>) -> 
Self {
         Self {
-            logger,
             rpc_client,
             message_listener,
         }
@@ -644,13 +610,8 @@ impl FifoConsumerWorker {
         ack_processor: &mut AckEntryProcessor,
         retry_policy: &BackOffRetryPolicy,
     ) -> Result<(), ClientError> {
-        let messages = PushConsumer::receive_messages(
-            &mut self.rpc_client,
-            &self.logger,
-            message_queue,
-            option,
-        )
-        .await?;
+        let messages =
+            PushConsumer::receive_messages(&mut self.rpc_client, 
message_queue, option).await?;
         for message in messages {
             let mut delivery_attempt = message.delivery_attempt();
             let max_delivery_attempts = retry_policy.get_max_attempts();
@@ -688,7 +649,6 @@ impl FifoConsumerWorker {
 }
 
 struct AckEntryProcessor {
-    logger: Logger,
     rpc_client: Session,
     consumer_group: Resource,
     topic: Resource,
@@ -698,9 +658,8 @@ struct AckEntryProcessor {
 }
 
 impl AckEntryProcessor {
-    fn new(logger: Logger, rpc_client: Session, consumer_group: Resource, 
topic: Resource) -> Self {
+    fn new(rpc_client: Session, consumer_group: Resource, topic: Resource) -> 
Self {
         Self {
-            logger,
             rpc_client,
             consumer_group,
             topic,
@@ -712,7 +671,6 @@ impl AckEntryProcessor {
 
     fn shadow_self(&self) -> Self {
         Self {
-            logger: self.logger.clone(),
             rpc_client: self.rpc_client.shadow_session(),
             consumer_group: self.consumer_group.clone(),
             topic: self.topic.clone(),
@@ -730,17 +688,10 @@ impl AckEntryProcessor {
                     .send(AckEntryItem::Ack(AckEntry::new(message)))
                     .await;
                 if send_result.is_err() {
-                    error!(
-                        self.logger,
-                        "put ack entry to queue error {:?}",
-                        send_result.err()
-                    );
+                    error!("put ack entry to queue error {:?}", 
send_result.err());
                 }
             } else {
-                error!(
-                    self.logger,
-                    "The ack entry sender is not set. Drop the ack message."
-                );
+                error!("The ack entry sender is not set. Drop the ack 
message.");
             }
         }
     }
@@ -762,17 +713,10 @@ impl AckEntryProcessor {
                     )))
                     .await;
                 if send_result.is_err() {
-                    error!(
-                        self.logger,
-                        "put nack entry to queue error {:?}",
-                        send_result.err()
-                    );
+                    error!("put nack entry to queue error {:?}", 
send_result.err());
                 }
             } else {
-                error!(
-                    self.logger,
-                    "Drop the nack message due to ack entry sender is not set."
-                )
+                error!("Drop the nack message due to ack entry sender is not 
set.")
             }
         }
     }
@@ -796,17 +740,10 @@ impl AckEntryProcessor {
                     )))
                     .await;
                 if send_result.is_err() {
-                    error!(
-                        self.logger,
-                        "put dlq entry to queue error {:?}",
-                        send_result.err()
-                    );
+                    error!("put dlq entry to queue error {:?}", 
send_result.err());
                 }
             } else {
-                error!(
-                    self.logger,
-                    "Drop the dlq message due to ack edntry sender is not set."
-                );
+                error!("Drop the dlq message due to ack edntry sender is not 
set.");
             }
         }
     }
@@ -869,15 +806,15 @@ impl AckEntryProcessor {
                     _ = ack_ticker.tick() => {
                         let result = processor.process_ack_entry_queue(&mut 
ack_entry_queue).await;
                         if result.is_err() {
-                            error!(processor.logger, "process ack entry queue 
failed: {:?}", result);
+                            error!("process ack entry queue failed: {:?}", 
result);
                         }
                     }
                     Some(ack_entry) = ack_entry_receiver.recv() => {
                         ack_entry_queue.push_back(ack_entry);
-                        debug!(processor.logger, "ack entry queue size: {}", 
ack_entry_queue.len());
+                        debug!("ack entry queue size: {}", 
ack_entry_queue.len());
                     }
                     _ = shutdown_token.cancelled() => {
-                        info!(processor.logger, "need to process remaining {} 
entries on shutdown.", ack_entry_queue.len());
+                        info!("need to process remaining {} entries on 
shutdown.", ack_entry_queue.len());
                         processor.flush_ack_entry_queue(&mut 
ack_entry_queue).await;
                         break;
                     }
@@ -913,10 +850,7 @@ impl AckEntryProcessor {
             if result.is_ok() {
                 ack_entry_queue.pop_front();
             } else {
-                error!(
-                    self.logger,
-                    "ack message failed: {:?}, will deliver later.", result
-                );
+                error!("ack message failed: {:?}, will deliver later.", 
result);
                 ack_entry_item.inc_attempt();
             }
         }
@@ -965,15 +899,12 @@ impl AckEntryProcessor {
             shutdown_token.cancel();
         }
         if let Some(task_tracker) = self.task_tracker.take() {
-            info!(self.logger, "waiting for task to complete");
+            info!("waiting for task to complete");
             task_tracker.close();
             task_tracker.wait().await;
-            info!(self.logger, "task completed");
+            info!("task completed");
         }
-        info!(
-            self.logger,
-            "The ack entry processor shuts down completely."
-        );
+        info!("The ack entry processor shuts down completely.");
     }
 }
 
@@ -982,7 +913,6 @@ mod tests {
     use std::sync::atomic::{AtomicUsize, Ordering};
     use std::{str::FromStr, vec};
 
-    use log::terminal_logger;
     use pb::{
         AckMessageResponse, Address, Broker, ChangeInvisibleDurationResponse, 
Code,
         ForwardMessageToDeadLetterQueueResponse, QueryRouteResponse, 
ReceiveMessageResponse,
@@ -1021,7 +951,7 @@ mod tests {
         option2.set_consumer_group("test");
         option2.subscribe("test_topic", FilterExpression::new(FilterType::Tag, 
"*"));
         let context = Client::new_context();
-        context.expect().returning(|_, _, _| Ok(Client::default()));
+        context.expect().returning(|_, _| Ok(Client::default()));
         let result3 = PushConsumer::new(
             ClientOption::default(),
             option2,
@@ -1061,12 +991,11 @@ mod tests {
         });
         client.expect_get_route_manager().returning(|| {
             TopicRouteManager::new(
-                terminal_logger(),
                 "".to_string(),
                 Endpoints::from_url("http://localhost:8081";).unwrap(),
             )
         });
-        context.expect().return_once(|_, _, _| Ok(client));
+        context.expect().return_once(|_, _| Ok(client));
         let mut push_consumer_option = PushConsumerOption::default();
         push_consumer_option.set_consumer_group("test_group");
         push_consumer_option.subscribe("test_topic", 
FilterExpression::new(FilterType::Tag, "*"));
@@ -1084,7 +1013,6 @@ mod tests {
     #[tokio::test]
     async fn test_process_assignments_invalid_assignment() -> Result<(), 
ClientError> {
         let rpc_client = Session::default();
-        let logger = terminal_logger();
         let option = &PushConsumerOption::default();
         let message_listener: Arc<MessageListener> = Arc::new(Box::new(|_| 
ConsumeResult::SUCCESS));
         let mut actor_table: HashMap<MessageQueue, MessageQueueActor> = 
HashMap::new();
@@ -1100,13 +1028,12 @@ mod tests {
             }),
         }];
         let context = MockMessageQueueActor::new_context();
-        context.expect().returning(|_, _, _, _, _, _| {
+        context.expect().returning(|_, _, _, _, _| {
             let mut actor = MockMessageQueueActor::default();
             actor.expect_start().returning(|| Ok(()));
             return actor;
         });
         PushConsumer::process_assignments(
-            logger,
             &rpc_client,
             option,
             message_listener,
@@ -1121,7 +1048,6 @@ mod tests {
 
     #[tokio::test]
     async fn test_process_assignments() -> Result<(), ClientError> {
-        let logger = terminal_logger();
         let option = &PushConsumerOption::default();
         let message_listener: Arc<MessageListener> = Arc::new(Box::new(|_| 
ConsumeResult::SUCCESS));
 
@@ -1163,7 +1089,6 @@ mod tests {
             }),
         }];
         PushConsumer::process_assignments(
-            logger,
             &rpc_client,
             option,
             message_listener,
@@ -1178,7 +1103,6 @@ mod tests {
 
     #[tokio::test]
     async fn test_process_assignments_two_queues() -> Result<(), ClientError> {
-        let logger = terminal_logger();
         let option = &PushConsumerOption::default();
         let message_listener: Arc<MessageListener> = Arc::new(Box::new(|_| 
ConsumeResult::SUCCESS));
         let mut actor_table: HashMap<MessageQueue, MessageQueueActor> = 
HashMap::new();
@@ -1242,7 +1166,6 @@ mod tests {
             mock
         });
         PushConsumer::process_assignments(
-            logger.clone(),
             &session,
             option,
             message_listener,
@@ -1274,7 +1197,6 @@ mod tests {
             }),
         }];
         PushConsumer::process_assignments(
-            logger,
             &session,
             option,
             Arc::new(Box::new(|_| ConsumeResult::SUCCESS)),
@@ -1365,7 +1287,6 @@ mod tests {
             mock
         });
         let mut actor = MessageQueueActor::new(
-            terminal_logger(),
             session,
             message_queue,
             option,
@@ -1399,7 +1320,6 @@ mod tests {
             mock
         });
         let mut actor = MessageQueueActor::new(
-            terminal_logger(),
             session,
             message_queue,
             option,
@@ -1437,8 +1357,7 @@ mod tests {
             name: "test_topic".to_string(),
             resource_namespace: "".to_string(),
         };
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), rpc_client, 
consumer_group, topic);
+        let mut ack_processor = AckEntryProcessor::new(rpc_client, 
consumer_group, topic);
         let result = ack_processor.start().await;
         assert!(result.is_ok());
 
@@ -1487,8 +1406,7 @@ mod tests {
             name: "test_topic".to_string(),
             resource_namespace: "".to_string(),
         };
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), rpc_client, 
consumer_group, topic);
+        let mut ack_processor = AckEntryProcessor::new(rpc_client, 
consumer_group, topic);
         let result = ack_processor.start().await;
         assert!(result.is_ok());
 
@@ -1534,8 +1452,7 @@ mod tests {
             name: "test_topic".to_string(),
             resource_namespace: "".to_string(),
         };
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), rpc_client, 
consumer_group, topic);
+        let mut ack_processor = AckEntryProcessor::new(rpc_client, 
consumer_group, topic);
         let result = ack_processor.start().await;
         assert!(result.is_ok());
 
@@ -1590,8 +1507,7 @@ mod tests {
             name: "test_topic".to_string(),
             resource_namespace: "".to_string(),
         };
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), rpc_client, 
consumer_group, topic);
+        let mut ack_processor = AckEntryProcessor::new(rpc_client, 
consumer_group, topic);
         let result = ack_processor.start().await;
         assert!(result.is_ok());
 
@@ -1637,8 +1553,7 @@ mod tests {
             name: "test_topic".to_string(),
             resource_namespace: "".to_string(),
         };
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), rpc_client, 
consumer_group, topic);
+        let mut ack_processor = AckEntryProcessor::new(rpc_client, 
consumer_group, topic);
         let result = ack_processor.start().await;
         assert!(result.is_ok());
 
@@ -1692,8 +1607,7 @@ mod tests {
             name: "test_topic".to_string(),
             resource_namespace: "".to_string(),
         };
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), rpc_client, 
consumer_group, topic);
+        let mut ack_processor = AckEntryProcessor::new(rpc_client, 
consumer_group, topic);
         let result = ack_processor.start().await;
         assert!(result.is_ok());
 
@@ -1721,11 +1635,8 @@ mod tests {
                 content: Some(Content::Message(new_message())),
             }])
         });
-        let consumer_worker = StandardConsumerWorker::new(
-            terminal_logger(),
-            rpc_client,
-            Arc::new(Box::new(|_| ConsumeResult::SUCCESS)),
-        );
+        let consumer_worker =
+            StandardConsumerWorker::new(rpc_client, Arc::new(Box::new(|_| 
ConsumeResult::SUCCESS)));
         let consumer_group = Resource {
             name: "test_group".to_string(),
             resource_namespace: "".to_string(),
@@ -1757,8 +1668,7 @@ mod tests {
                 entries: vec![],
             })
         });
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), session, consumer_group, 
topic);
+        let mut ack_processor = AckEntryProcessor::new(session, 
consumer_group, topic);
         let is_start_ok = ack_processor.start().await;
         assert!(is_start_ok.is_ok());
         let result = ConsumerWorker::Standard(consumer_worker)
@@ -1780,11 +1690,8 @@ mod tests {
                 content: Some(Content::Message(new_message())),
             }])
         });
-        let consumer_worker = StandardConsumerWorker::new(
-            terminal_logger(),
-            rpc_client,
-            Arc::new(Box::new(|_| ConsumeResult::FAILURE)),
-        );
+        let consumer_worker =
+            StandardConsumerWorker::new(rpc_client, Arc::new(Box::new(|_| 
ConsumeResult::FAILURE)));
         let consumer_group = Resource {
             name: "test_group".to_string(),
             resource_namespace: "".to_string(),
@@ -1820,8 +1727,7 @@ mod tests {
                     receipt_handle: "".to_string(),
                 })
             });
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), session, consumer_group, 
topic);
+        let mut ack_processor = AckEntryProcessor::new(session, 
consumer_group, topic);
         let is_start_ok = ack_processor.start().await;
         assert!(is_start_ok.is_ok());
         let result = ConsumerWorker::Standard(consumer_worker)
@@ -1843,11 +1749,8 @@ mod tests {
                 content: Some(Content::Message(new_message())),
             }])
         });
-        let consumer_worker = FifoConsumerWorker::new(
-            terminal_logger(),
-            rpc_client,
-            Arc::new(Box::new(|_| ConsumeResult::SUCCESS)),
-        );
+        let consumer_worker =
+            FifoConsumerWorker::new(rpc_client, Arc::new(Box::new(|_| 
ConsumeResult::SUCCESS)));
         let consumer_group = Resource {
             name: "test_group".to_string(),
             resource_namespace: "".to_string(),
@@ -1877,8 +1780,7 @@ mod tests {
                 entries: vec![],
             })
         });
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), session, consumer_group, 
topic);
+        let mut ack_processor = AckEntryProcessor::new(session, 
consumer_group, topic);
         let result = ConsumerWorker::Fifo(consumer_worker)
             .receive_messages(
                 &new_message_queue(),
@@ -1901,7 +1803,6 @@ mod tests {
         let receive_call_count = Arc::new(AtomicUsize::new(0));
         let outer_call_counter = Arc::clone(&receive_call_count);
         let consumer_worker = FifoConsumerWorker::new(
-            terminal_logger(),
             rpc_client,
             Arc::new(Box::new(move |_| {
                 receive_call_count.fetch_add(1, Ordering::Relaxed);
@@ -1940,8 +1841,7 @@ mod tests {
                     }),
                 })
             });
-        let mut ack_processor =
-            AckEntryProcessor::new(terminal_logger(), session, consumer_group, 
topic);
+        let mut ack_processor = AckEntryProcessor::new(session, 
consumer_group, topic);
         let result = ConsumerWorker::Fifo(consumer_worker)
             .receive_messages(
                 &new_message_queue(),
diff --git a/rust/src/session.rs b/rust/src/session.rs
index ebd0687b..f9a08154 100644
--- a/rust/src/session.rs
+++ b/rust/src/session.rs
@@ -18,7 +18,6 @@
 use async_trait::async_trait;
 use mockall::{automock, mock};
 use ring::hmac;
-use slog::{debug, error, info, o, Logger};
 use time::format_description::well_known::Rfc3339;
 use time::OffsetDateTime;
 use tokio::sync::{mpsc, oneshot};
@@ -42,6 +41,7 @@ use crate::pb::{
 };
 use crate::util::{PROTOCOL_VERSION, SDK_LANGUAGE, SDK_VERSION};
 use crate::{error::ClientError, 
pb::messaging_service_client::MessagingServiceClient};
+use tracing::{debug, error, info};
 
 const OPERATION_START: &str = "session.start";
 const OPERATION_UPDATE_SETTINGS: &str = "session.update_settings";
@@ -104,7 +104,6 @@ pub(crate) trait RPCClient {
 
 #[derive(Debug)]
 pub(crate) struct Session {
-    logger: Logger,
     client_id: String,
     option: ClientOption,
     endpoints: Endpoints,
@@ -122,7 +121,6 @@ impl Session {
 
     pub(crate) fn shadow_session(&self) -> Self {
         Session {
-            logger: self.logger.clone(),
             client_id: self.client_id.clone(),
             option: self.option.clone(),
             endpoints: self.endpoints.clone(),
@@ -134,7 +132,6 @@ impl Session {
     }
 
     pub(crate) async fn new(
-        logger: &Logger,
         endpoints: &Endpoints,
         client_id: String,
         option: &ClientOption,
@@ -171,14 +168,12 @@ impl Session {
 
         let stub = MessagingServiceClient::new(channel);
 
-        let logger = logger.new(o!("peer" => peer.clone()));
-        info!(logger, "create session success");
+        info!("create session success");
 
         Ok(Session {
-            logger,
+            client_id,
             option: option.clone(),
             endpoints: endpoints.clone(),
-            client_id,
             stub,
             telemetry_tx: None,
             telemetry_command_tx: None,
@@ -293,11 +288,8 @@ impl Session {
 
         self.establish_telemetry_stream(settings).await?;
 
-        debug!(
-            self.logger,
-            "telemetry_command_tx: {:?}", self.telemetry_command_tx
-        );
-        info!(self.logger, "starting client success");
+        debug!("telemetry_command_tx: {:?}", self.telemetry_command_tx);
+        info!("starting client success");
         Ok(())
     }
 
@@ -308,10 +300,7 @@ impl Session {
         if let Some(old_shutdown_tx) = self.shutdown_tx.take() {
             // a `send` error means the receiver is already gone, which is 
fine.
             let _ = old_shutdown_tx.send(());
-            info!(
-                self.logger,
-                "sent shutdown signal to the previous telemetry stream."
-            );
+            info!("sent shutdown signal to the previous telemetry stream.");
         }
 
         let (tx, rx) = mpsc::channel(16);
@@ -340,7 +329,6 @@ impl Session {
         self.shutdown_tx = Some(shutdown_tx);
         self.telemetry_tx = Some(tx);
 
-        let logger = self.logger.clone();
         let telemetry_command_tx = 
self.telemetry_command_tx.as_ref().unwrap().clone();
         tokio::spawn(async move {
             let mut stream = response.into_inner();
@@ -349,30 +337,30 @@ impl Session {
                     message = stream.message() => {
                         match message {
                             Ok(Some(item)) => {
-                                debug!(logger, "receive telemetry command: 
{:?}", item);
+                                debug!("receive telemetry command: {:?}", 
item);
                                 if let Some(command) = item.command {
                                     _ = 
telemetry_command_tx.send(command).await;
                                 }
                             }
                             Ok(None) => {
-                                info!(logger, "telemetry command stream closed 
by server");
+                                info!("telemetry command stream closed by 
server");
                                 break;
                             }
                             Err(e) => {
-                                error!(logger, "telemetry response error: 
{:?}", e);
+                                error!("telemetry response error: {:?}", e);
                                 break;
                             }
                         }
                     }
                     _ = &mut shutdown_rx => {
-                        info!(logger, "receive shutdown signal, stop dealing 
with telemetry command");
+                        info!("receive shutdown signal, stop dealing with 
telemetry command");
                         break;
                     }
                 }
             }
             // telemetry tx will be dropped here
         });
-        debug!(self.logger, "start session success");
+        debug!("start session success");
 
         Ok(())
     }
@@ -392,10 +380,7 @@ impl Session {
         settings: TelemetryCommand,
     ) -> Result<(), ClientError> {
         if self.is_started() {
-            debug!(
-                self.logger,
-                "session is already started: {:?}", self.telemetry_tx
-            );
+            debug!("session is already started: {:?}", self.telemetry_tx);
             if let Some(tx) = self.telemetry_tx.as_ref() {
                 tx.send(settings).await.map_err(|e| {
                     ClientError::new(
@@ -408,10 +393,9 @@ impl Session {
             }
             Ok(())
         } else {
-            debug!(self.logger, "session is closed: {:?}", self.telemetry_tx);
+            debug!("session is closed: {:?}", self.telemetry_tx);
             self.establish_telemetry_stream(settings).await?;
             debug!(
-                self.logger,
                 "session is established, closed: {:?}",
                 self.telemetry_tx.as_ref().unwrap().is_closed()
             );
@@ -636,7 +620,6 @@ mock! {
     pub(crate) Session {
         pub(crate) fn shadow_session(&self) -> Self;
         pub(crate) async fn new(
-            logger: &Logger,
             endpoints: &Endpoints,
             client_id: String,
             option: &ClientOption,
@@ -703,14 +686,8 @@ mock! {
 
 #[cfg(test)]
 mod tests {
-    use mockall::predicate::always;
-    use slog::debug;
     use wiremock_grpc::generate;
 
-    use crate::conf::ProducerOption;
-    use crate::log::terminal_logger;
-    use crate::util::build_producer_settings;
-
     use super::*;
 
     generate!("apache.rocketmq.v2", RocketMQMockServer);
@@ -718,31 +695,25 @@ mod tests {
     #[tokio::test]
     async fn session_new() {
         let server = RocketMQMockServer::start_default().await;
-        let logger = terminal_logger();
-        let mut client_option = ClientOption::default();
-        client_option.set_enable_tls(false);
         let session = Session::new(
-            &logger,
             &Endpoints::from_url(&format!("localhost:{}", 
server.address().port())).unwrap(),
             "test_client".to_string(),
-            &client_option,
+            &ClientOption::default(),
         )
         .await;
-        debug!(logger, "session: {:?}", session);
+        debug!("session: {:?}", session);
         assert!(session.is_ok());
     }
 
     #[tokio::test]
     async fn session_new_multi_addr() {
-        let logger = terminal_logger();
         let session = Session::new(
-            &logger,
             &Endpoints::from_url("127.0.0.1:8080,127.0.0.1:8081").unwrap(),
             "test_client".to_string(),
             &ClientOption::default(),
         )
         .await;
-        debug!(logger, "session: {:?}", session);
+        debug!("session: {:?}", session);
         assert!(session.is_ok());
     }
 
diff --git a/rust/src/simple_consumer.rs b/rust/src/simple_consumer.rs
index cc15949d..c78bed81 100644
--- a/rust/src/simple_consumer.rs
+++ b/rust/src/simple_consumer.rs
@@ -18,9 +18,9 @@
 use std::time::Duration;
 
 use mockall_double::double;
-use slog::{info, warn, Logger};
 use tokio::select;
 use tokio::sync::{mpsc, oneshot};
+use tracing::{info, warn};
 
 #[double]
 use crate::client::Client;
@@ -31,7 +31,6 @@ use crate::model::message::{AckMessageEntry, MessageView};
 use crate::util::{
     build_endpoints_by_message_queue, build_simple_consumer_settings, 
select_message_queue,
 };
-use crate::{log, pb};
 
 /// [`SimpleConsumer`] is a lightweight consumer to consume messages from 
RocketMQ proxy.
 ///
@@ -45,7 +44,6 @@ use crate::{log, pb};
 #[derive(Debug)]
 pub struct SimpleConsumer {
     option: SimpleConsumerOption,
-    logger: Logger,
     client: Client,
     shutdown_tx: Option<oneshot::Sender<()>>,
 }
@@ -74,15 +72,9 @@ impl SimpleConsumer {
             namespace: option.namespace().to_string(),
             ..client_option
         };
-        let logger = log::logger(option.logging_format());
-        let client = Client::new(
-            &logger,
-            client_option,
-            build_simple_consumer_settings(&option),
-        )?;
+        let client = Client::new(client_option, 
build_simple_consumer_settings(&option))?;
         Ok(SimpleConsumer {
             option,
-            logger,
             client,
             shutdown_tx: None,
         })
@@ -106,12 +98,11 @@ impl SimpleConsumer {
         }
         let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
         self.shutdown_tx = Some(shutdown_tx);
-        let logger = self.logger.clone();
         tokio::spawn(async move {
             loop {
                 select! {
                     command = telemetry_command_rx.recv() => {
-                        warn!(logger, "command {:?} cannot be handled in 
simple consumer.", command);
+                        warn!("command {:?} cannot be handled in simple 
consumer.", command);
                     }
 
                     _ = &mut shutdown_rx => {
@@ -121,7 +112,6 @@ impl SimpleConsumer {
             }
         });
         info!(
-            self.logger,
             "start simple consumer success, client_id: {}",
             self.client.client_id()
         );
@@ -174,7 +164,7 @@ impl SimpleConsumer {
             .receive_message(
                 &endpoints,
                 message_queue,
-                pb::FilterExpression {
+                crate::pb::FilterExpression {
                     r#type: expression.filter_type() as i32,
                     expression: expression.expression().to_string(),
                 },
@@ -221,10 +211,9 @@ impl SimpleConsumer {
 mod tests {
     use std::sync::Arc;
 
-    use crate::log::terminal_logger;
     use crate::model::common::{FilterType, Route};
     use crate::pb::{
-        AckMessageResultEntry, Broker, Message, MessageQueue, Resource, 
SystemProperties,
+        AckMessageResultEntry, Broker, Endpoints, Message, MessageQueue, 
Resource, SystemProperties,
     };
 
     use super::*;
@@ -234,7 +223,7 @@ mod tests {
         let _m = crate::client::tests::MTX.lock();
 
         let ctx = Client::new_context();
-        ctx.expect().return_once(|_, _, _| {
+        ctx.expect().return_once(|_, _| {
             let mut client = Client::default();
             client.expect_topic_route().returning(|_, _| {
                 Ok(Arc::new(Route {
@@ -273,7 +262,7 @@ mod tests {
                     broker: Some(Broker {
                         name: "".to_string(),
                         id: 0,
-                        endpoints: Some(pb::Endpoints {
+                        endpoints: Some(Endpoints {
                             scheme: 0,
                             addresses: vec![],
                         }),
@@ -297,7 +286,6 @@ mod tests {
             .returning(|_: &MessageView| Ok(AckMessageResultEntry::default()));
         let simple_consumer = SimpleConsumer {
             option: SimpleConsumerOption::default(),
-            logger: terminal_logger(),
             client,
             shutdown_tx: None,
         };

Reply via email to