This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch refact/cluster
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/refact/cluster by this push:
     new 9b93de5  Mod: format code and fix some warnings (#167)
9b93de5 is described below

commit 9b93de56f296da3c1c2a55915c700e36b86ac3bb
Author: Yang Yang <962032...@qq.com>
AuthorDate: Sun Dec 10 13:53:45 2023 +0800

    Mod: format code and fix some warnings (#167)
    
    * style(dubbo): cargo fmt --all
    
    * style(dubbo): cargo fix --lib -p dubbo --allow-dirty
    
    * chore(github): update branch in pr
---
 .github/workflows/github-actions.yml        |   4 +-
 common/base/src/lib.rs                      |   2 +-
 dubbo/src/cluster/failover.rs               |  25 ++--
 dubbo/src/cluster/mod.rs                    |  38 +++---
 dubbo/src/codegen.rs                        |   3 +-
 dubbo/src/directory/mod.rs                  | 173 +++++++++++++---------------
 dubbo/src/invoker/clone_body.rs             |  50 +++-----
 dubbo/src/invoker/clone_invoker.rs          | 124 ++++++++++----------
 dubbo/src/invoker/mod.rs                    |   6 +-
 dubbo/src/lib.rs                            |  12 +-
 dubbo/src/loadbalancer/mod.rs               |  62 +++++-----
 dubbo/src/param.rs                          |   5 +-
 dubbo/src/protocol/mod.rs                   |   4 +-
 dubbo/src/protocol/triple/triple_invoker.rs |  36 +++---
 dubbo/src/registry/mod.rs                   |   3 +-
 dubbo/src/registry/n_registry.rs            |  32 ++---
 dubbo/src/route/mod.rs                      | 108 +++++++++--------
 dubbo/src/svc.rs                            |  28 ++---
 dubbo/src/triple/client/builder.rs          |  38 +++---
 dubbo/src/triple/client/triple.rs           |  28 ++---
 dubbo/src/triple/transport/connection.rs    |  31 ++---
 21 files changed, 386 insertions(+), 426 deletions(-)

diff --git a/.github/workflows/github-actions.yml 
b/.github/workflows/github-actions.yml
index 19d8c4d..878a5e1 100644
--- a/.github/workflows/github-actions.yml
+++ b/.github/workflows/github-actions.yml
@@ -6,7 +6,9 @@ on:
   push:
     branches: ["*"]
   pull_request:
-    branches: ["*"]
+    branches:
+    - '*'
+    - 'refact/*'
 
 
 jobs:
diff --git a/common/base/src/lib.rs b/common/base/src/lib.rs
index cb3bc09..b97b342 100644
--- a/common/base/src/lib.rs
+++ b/common/base/src/lib.rs
@@ -23,4 +23,4 @@ pub mod node;
 pub mod url;
 
 pub use node::Node;
-pub use url::Url;
\ No newline at end of file
+pub use url::Url;
diff --git a/dubbo/src/cluster/failover.rs b/dubbo/src/cluster/failover.rs
index 1a8149c..8a00c9f 100644
--- a/dubbo/src/cluster/failover.rs
+++ b/dubbo/src/cluster/failover.rs
@@ -2,18 +2,17 @@ use std::task::Poll;
 
 use futures_util::future;
 use http::Request;
-use tower::{ServiceExt, retry::Retry, util::Oneshot};
+use tower::{retry::Retry, util::Oneshot, ServiceExt};
 use tower_service::Service;
- 
+
 use crate::StdError;
 
 pub struct Failover<N> {
-    inner: N // loadbalancer service
+    inner: N, // loadbalancer service
 }
 
 #[derive(Clone)]
 pub struct FailoverPolicy;
- 
 
 impl<N> Failover<N> {
     pub fn new(inner: N) -> Self {
@@ -25,14 +24,13 @@ impl<B, Res, E> tower::retry::Policy<Request<B>, Res, E> 
for FailoverPolicy
 where
     B: http_body::Body + Clone,
 {
-    
     type Future = future::Ready<Self>;
 
-    fn retry(&self, req: &Request<B>, result: Result<&Res, &E>) -> 
Option<Self::Future> {
+    fn retry(&self, _req: &Request<B>, result: Result<&Res, &E>) -> 
Option<Self::Future> {
         //TODO some error handling or logging
         match result {
             Ok(_) => None,
-            Err(_) => Some(future::ready(self.clone()))
+            Err(_) => Some(future::ready(self.clone())),
         }
     }
 
@@ -43,21 +41,18 @@ where
         *clone.headers_mut() = req.headers().clone();
         *clone.version_mut() = req.version();
 
-
         Some(clone)
     }
 }
 
-
-
-impl<N, B> Service<Request<B>> for Failover<N> 
+impl<N, B> Service<Request<B>> for Failover<N>
 where
     // B is CloneBody<B>
     B: http_body::Body + Clone,
     // loadbalancer service
-    N: Service<Request<B>> + Clone + 'static ,
+    N: Service<Request<B>> + Clone + 'static,
     N::Error: Into<StdError>,
-    N::Future: Send
+    N::Future: Send,
 {
     type Response = N::Response;
 
@@ -68,9 +63,9 @@ where
     fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> 
Poll<Result<(), Self::Error>> {
         self.inner.poll_ready(cx)
     }
- 
+
     fn call(&mut self, req: Request<B>) -> Self::Future {
         let retry = Retry::new(FailoverPolicy, self.inner.clone());
         retry.oneshot(req)
     }
-}
\ No newline at end of file
+}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 0b1654a..1a20c16 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-
 use http::Request;
 use tower_service::Service;
 
-use crate::{codegen::RpcInvocation, svc::NewService, param::Param, 
invoker::clone_body::CloneBody};
+use crate::{
+    codegen::RpcInvocation, invoker::clone_body::CloneBody, param::Param, 
svc::NewService,
+};
 
 use self::failover::Failover;
 
@@ -30,12 +31,10 @@ pub struct NewCluster<N> {
 }
 
 pub struct Cluster<S> {
-    inner: S // failover service
+    inner: S, // failover service
 }
 
-
 impl<N> NewCluster<N> {
- 
     pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
         tower_layer::layer_fn(|inner: N| {
             NewCluster {
@@ -43,45 +42,44 @@ impl<N> NewCluster<N> {
             }
         })
     }
+}
 
-} 
-
-impl<S, T> NewService<T> for NewCluster<S> 
+impl<S, T> NewService<T> for NewCluster<S>
 where
-    T: Param<RpcInvocation>, 
+    T: Param<RpcInvocation>,
     // new loadbalancer service
     S: NewService<T>,
-{ 
-
+{
     type Service = Cluster<Failover<S::Service>>;
- 
-    fn new_service(&self, target: T) -> Self::Service {
 
+    fn new_service(&self, target: T) -> Self::Service {
         Cluster {
-            inner: Failover::new(self.inner.new_service(target))
+            inner: Failover::new(self.inner.new_service(target)),
         }
     }
 }
-  
-impl<S> Service<Request<hyper::Body>> for Cluster<S> 
+
+impl<S> Service<Request<hyper::Body>> for Cluster<S>
 where
     S: Service<Request<CloneBody>>,
 {
-
     type Response = S::Response;
 
     type Error = S::Error;
 
     type Future = S::Future;
 
-    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> 
std::task::Poll<Result<(), Self::Error>> {
+    fn poll_ready(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), Self::Error>> {
         self.inner.poll_ready(cx)
     }
- 
+
     fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
         let (parts, body) = req.into_parts();
         let clone_body = CloneBody::new(body);
-        let req = Request::from_parts(parts, clone_body); 
+        let req = Request::from_parts(parts, clone_body);
         self.inner.call(req)
     }
 }
diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs
index 38f8f1d..452f560 100644
--- a/dubbo/src/codegen.rs
+++ b/dubbo/src/codegen.rs
@@ -44,8 +44,7 @@ pub use super::{
 pub use crate::{
     filter::{service::FilterService, Filter},
     triple::{
-        client::builder::ClientBuilder,
-        server::builder::ServerBuilder,
+        client::builder::ClientBuilder, server::builder::ServerBuilder,
         transport::connection::Connection,
     },
 };
diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs
index a4cf466..84900aa 100644
--- a/dubbo/src/directory/mod.rs
+++ b/dubbo/src/directory/mod.rs
@@ -16,107 +16,109 @@
  */
 
 use std::{
-    task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex}, pin::Pin,
+    collections::HashMap,
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+use crate::{
+    codegen::{RpcInvocation, TripleInvoker},
+    invocation::Invocation,
+    invoker::{clone_invoker::CloneInvoker, NewInvoker},
+    param::Param,
+    registry::n_registry::Registry,
+    svc::NewService,
+    StdError,
 };
-    
-use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, 
invocation::Invocation, registry::n_registry::Registry, 
invoker::{NewInvoker,clone_invoker::CloneInvoker}, svc::NewService, 
param::Param};
 use dubbo_logger::tracing::debug;
 use futures_core::ready;
 use futures_util::future;
 use tokio::sync::mpsc::channel;
 use tokio_stream::wrappers::ReceiverStream;
 use tower::{
-    discover::{Change, Discover}, buffer::Buffer,
+    buffer::Buffer,
+    discover::{Change, Discover},
 };
 
 use tower_service::Service;
 
-type BufferedDirectory = Buffer<Directory<ReceiverStream<Result<Change<String, 
()>, StdError>>>, ()>; 
+type BufferedDirectory =
+    Buffer<Directory<ReceiverStream<Result<Change<String, ()>, StdError>>>, 
()>;
 
 pub struct NewCachedDirectory<N>
 where
     N: Registry + Clone + Send + Sync + 'static,
 {
-    inner: CachedDirectory<NewDirectory<N>, RpcInvocation>
+    inner: CachedDirectory<NewDirectory<N>, RpcInvocation>,
 }
 
-
-pub struct CachedDirectory<N, T> 
+pub struct CachedDirectory<N, T>
 where
-    // NewDirectory 
-    N: NewService<T>
+    // NewDirectory
+    N: NewService<T>,
 {
     inner: N,
-    cache: Arc<Mutex<HashMap<String, N::Service>>>
+    cache: Arc<Mutex<HashMap<String, N::Service>>>,
 }
 
-
 pub struct NewDirectory<N> {
     // registry
     inner: N,
 }
 
-
 pub struct Directory<D> {
     directory: HashMap<String, CloneInvoker<TripleInvoker>>,
     discover: D,
     new_invoker: NewInvoker,
 }
 
-
-
-impl<N> NewCachedDirectory<N> 
+impl<N> NewCachedDirectory<N>
 where
     N: Registry + Clone + Send + Sync + 'static,
 {
-
     pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
         tower_layer::layer_fn(|inner: N| {
             NewCachedDirectory {
-                // inner is registry 
-                inner: CachedDirectory::new(NewDirectory::new(inner)), 
+                // inner is registry
+                inner: CachedDirectory::new(NewDirectory::new(inner)),
             }
         })
     }
 }
 
-
 impl<N, T> NewService<T> for NewCachedDirectory<N>
 where
     T: Param<RpcInvocation>,
     // service registry
     N: Registry + Clone + Send + Sync + 'static,
 {
-    type Service = BufferedDirectory; 
+    type Service = BufferedDirectory;
 
     fn new_service(&self, target: T) -> Self::Service {
-    
         self.inner.new_service(target.param())
     }
-} 
- 
- 
-impl<N, T> CachedDirectory<N, T> 
+}
+
+impl<N, T> CachedDirectory<N, T>
 where
-    N: NewService<T>
+    N: NewService<T>,
 {
-
     pub fn new(inner: N) -> Self {
         CachedDirectory {
             inner,
-            cache: Default::default()
+            cache: Default::default(),
         }
-    } 
-} 
-  
- 
-impl<N, T> NewService<T> for CachedDirectory<N, T> 
+    }
+}
+
+impl<N, T> NewService<T> for CachedDirectory<N, T>
 where
     T: Param<RpcInvocation>,
     // NewDirectory
     N: NewService<T>,
     // Buffered directory
-    N::Service: Clone
+    N::Service: Clone,
 {
     type Service = N::Service;
 
@@ -124,44 +126,35 @@ where
         let rpc_invocation = target.param();
         let service_name = rpc_invocation.get_target_service_unique_name();
         let mut cache = self.cache.lock().expect("cached directory lock 
failed.");
-        let value = cache.get(&service_name).map(|val|val.clone());
+        let value = cache.get(&service_name).map(|val| val.clone());
         match value {
             None => {
                 let new_service = self.inner.new_service(target);
                 cache.insert(service_name, new_service.clone());
                 new_service
-            },
-            Some(value) => value
-        } 
+            }
+            Some(value) => value,
+        }
     }
 }
 
-
 impl<N> NewDirectory<N> {
-
     const MAX_DIRECTORY_BUFFER_SIZE: usize = 16;
 
     pub fn new(inner: N) -> Self {
-        NewDirectory {
-            inner
-        }
+        NewDirectory { inner }
     }
-} 
-
-
+}
 
-impl<N, T> NewService<T> for NewDirectory<N> 
+impl<N, T> NewService<T> for NewDirectory<N>
 where
     T: Param<RpcInvocation>,
     // service registry
-    N: Registry + Clone + Send + Sync + 'static, 
+    N: Registry + Clone + Send + Sync + 'static,
 {
-    type Service = BufferedDirectory; 
+    type Service = BufferedDirectory;
 
-    
- 
     fn new_service(&self, target: T) -> Self::Service {
-        
         let service_name = target.param().get_target_service_unique_name();
 
         let registry = self.inner.clone();
@@ -172,39 +165,35 @@ where
             let receiver = registry.subscribe(service_name).await;
             debug!("discover start!");
             match receiver {
-                Err(e) => {
+                Err(_e) => {
                     // error!("discover stream error: {}", e);
                     debug!("discover stream error");
-                },
-                Ok(mut receiver) => {
-                    loop {
-                        let change = receiver.recv().await;
-                        debug!("receive change: {:?}", change); 
-                        match change {
-                            None => {
-                                debug!("discover stream closed.");
-                                break;
-                            },
-                            Some(change) => {
-                                let _  = tx.send(change).await;
-                            }
+                }
+                Ok(mut receiver) => loop {
+                    let change = receiver.recv().await;
+                    debug!("receive change: {:?}", change);
+                    match change {
+                        None => {
+                            debug!("discover stream closed.");
+                            break;
+                        }
+                        Some(change) => {
+                            let _ = tx.send(change).await;
                         }
                     }
-                }
+                },
             }
+        });
 
-        }); 
-
-        Buffer::new(Directory::new(ReceiverStream::new(rx)), 
Self::MAX_DIRECTORY_BUFFER_SIZE)
-    } 
-
-} 
-
-
-impl<D> Directory<D> { 
+        Buffer::new(
+            Directory::new(ReceiverStream::new(rx)),
+            Self::MAX_DIRECTORY_BUFFER_SIZE,
+        )
+    }
+}
 
+impl<D> Directory<D> {
     pub fn new(discover: D) -> Self {
-  
         Directory {
             directory: Default::default(),
             discover,
@@ -213,13 +202,12 @@ impl<D> Directory<D> {
     }
 }
 
-
-impl<D> Service<()> for Directory<D> 
+impl<D> Service<()> for Directory<D>
 where
     // Discover
-    D: Discover<Key = String> + Unpin + Send, 
-    D::Error: Into<StdError> 
-{ 
+    D: Discover<Key = String> + Unpin + Send,
+    D::Error: Into<StdError>,
+{
     type Response = Vec<CloneInvoker<TripleInvoker>>;
 
     type Error = StdError;
@@ -227,21 +215,22 @@ where
     type Future = future::Ready<Result<Self::Response, Self::Error>>;
 
     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), 
Self::Error>> {
-        
         loop {
             let pin_discover = Pin::new(&mut self.discover);
-            let change = 
ready!(pin_discover.poll_discover(cx)).transpose().map_err(|e| e.into())?;
+            let change = ready!(pin_discover.poll_discover(cx))
+                .transpose()
+                .map_err(|e| e.into())?;
             match change {
                 Some(Change::Remove(key)) => {
                     debug!("remove key: {}", key);
                     self.directory.remove(&key);
-                },  
+                }
                 Some(Change::Insert(key, _)) => {
                     debug!("insert key: {}", key);
                     let invoker = self.new_invoker.new_service(key.clone());
                     self.directory.insert(key, invoker);
-                },
-                None => { 
+                }
+                None => {
                     debug!("stream closed");
                     return Poll::Ready(Ok(()));
                 }
@@ -250,7 +239,11 @@ where
     }
 
     fn call(&mut self, _: ()) -> Self::Future {
-        let vec = 
self.directory.values().map(|val|val.clone()).collect::<Vec<CloneInvoker<TripleInvoker>>>();
+        let vec = self
+            .directory
+            .values()
+            .map(|val| val.clone())
+            .collect::<Vec<CloneInvoker<TripleInvoker>>>();
         future::ok(vec)
     }
-}
\ No newline at end of file
+}
diff --git a/dubbo/src/invoker/clone_body.rs b/dubbo/src/invoker/clone_body.rs
index 5ce2e1f..4de8f89 100644
--- a/dubbo/src/invoker/clone_body.rs
+++ b/dubbo/src/invoker/clone_body.rs
@@ -7,20 +7,20 @@ use std::{
 
 use bytes::{Buf, BufMut, Bytes, BytesMut};
 use futures_core::ready;
- 
+
 use http::HeaderMap;
 use http_body::Body;
 use pin_project::pin_project;
 use thiserror::Error;
 
 use crate::StdError;
- 
+
 #[derive(Error, Debug)]
-#[error("buffered body reach max capacity.")] 
+#[error("buffered body reach max capacity.")]
 pub struct ReachMaxCapacityError;
 
 pub struct BufferedBody {
-    shared:  Arc<Mutex<Option<OwnedBufferedBody>>>,
+    shared: Arc<Mutex<Option<OwnedBufferedBody>>>,
     owned: Option<OwnedBufferedBody>,
     replay_body: bool,
     replay_trailers: bool,
@@ -34,10 +34,7 @@ pub struct OwnedBufferedBody {
     buf: InnerBuffer,
 }
 
-
-
 impl BufferedBody {
-
     pub fn new(body: hyper::Body, buf_size: usize) -> Self {
         let size_hint = body.size_hint();
         let is_empty = body.is_end_stream();
@@ -57,11 +54,9 @@ impl BufferedBody {
             size_hint,
         }
     }
-
 }
 
 impl Clone for BufferedBody {
-
     fn clone(&self) -> Self {
         Self {
             shared: self.shared.clone(),
@@ -86,7 +81,6 @@ impl Drop for BufferedBody {
 }
 
 impl Body for BufferedBody {
-
     type Data = BytesData;
     type Error = StdError;
 
@@ -106,14 +100,10 @@ impl Body for BufferedBody {
             data.take().expect("cannot get shared buffered body.")
         });
 
-      
-
         if mut_self.replay_body {
             mut_self.replay_body = false;
             if owned_body.buf.has_remaining() {
-                return Poll::Ready(Some(Ok(BytesData::BufferedBytes(
-                    owned_body.buf.clone(),
-                ))));
+                return 
Poll::Ready(Some(Ok(BytesData::BufferedBytes(owned_body.buf.clone()))));
             }
 
             if owned_body.buf.is_capped() {
@@ -150,10 +140,8 @@ impl Body for BufferedBody {
         } else {
             owned_body.buf.push_bytes(data.copy_to_bytes(len))
         };
- 
-        Poll::Ready(Some(Ok(BytesData::OriginBytes(data))))
-
 
+        Poll::Ready(Some(Ok(BytesData::OriginBytes(data))))
     }
 
     fn poll_trailers(
@@ -170,7 +158,7 @@ impl Body for BufferedBody {
 
             data.take().expect("cannot get shared buffered body.")
         });
- 
+
         if mut_self.replay_trailers {
             mut_self.replay_trailers = false;
             if let Some(ref trailers) = owned_body.trailers {
@@ -184,7 +172,7 @@ impl Body for BufferedBody {
                 owned_body.trailers = trailers.clone();
                 trailers
             });
-            return Poll::Ready(trailers.map_err(|e|e.into()));
+            return Poll::Ready(trailers.map_err(|e| e.into()));
         }
 
         Poll::Ready(Ok(None))
@@ -195,9 +183,11 @@ impl Body for BufferedBody {
             return true;
         }
 
-        let is_end = self.owned.as_ref()
-                                    .map(|owned|owned.body.is_end_stream())
-                                    .unwrap_or(false);
+        let is_end = self
+            .owned
+            .as_ref()
+            .map(|owned| owned.body.is_end_stream())
+            .unwrap_or(false);
 
         !self.replay_body && !self.replay_trailers && is_end
     }
@@ -205,12 +195,8 @@ impl Body for BufferedBody {
     fn size_hint(&self) -> http_body::SizeHint {
         self.size_hint.clone()
     }
-
-    
 }
 
-
-
 #[derive(Clone)]
 pub struct InnerBuffer {
     bufs: VecDeque<Bytes>,
@@ -328,13 +314,12 @@ pub struct CloneBody(#[pin] BufferedBody);
 
 impl CloneBody {
     pub fn new(inner_body: hyper::Body) -> Self {
-        let inner_body = BufferedBody::new(inner_body, 1024 * 64); 
+        let inner_body = BufferedBody::new(inner_body, 1024 * 64);
         CloneBody(inner_body)
     }
 }
 
-impl Body for CloneBody{
-
+impl Body for CloneBody {
     type Data = BytesData;
 
     type Error = StdError;
@@ -350,7 +335,7 @@ impl Body for CloneBody{
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
-        self.project().0.poll_trailers(cx) 
+        self.project().0.poll_trailers(cx)
     }
 
     fn size_hint(&self) -> http_body::SizeHint {
@@ -358,9 +343,8 @@ impl Body for CloneBody{
     }
 }
 
-
 impl Clone for CloneBody {
     fn clone(&self) -> Self {
         Self(self.0.clone())
     }
-}
\ No newline at end of file
+}
diff --git a/dubbo/src/invoker/clone_invoker.rs 
b/dubbo/src/invoker/clone_invoker.rs
index fe621b8..c1fa00d 100644
--- a/dubbo/src/invoker/clone_invoker.rs
+++ b/dubbo/src/invoker/clone_invoker.rs
@@ -1,76 +1,81 @@
-use std::{task::Poll, pin::Pin, mem};
+use std::{mem, pin::Pin, task::Poll};
 
 use dubbo_logger::tracing::debug;
-use futures_core::{Future, TryFuture, ready, future::BoxFuture};
+use futures_core::{future::BoxFuture, ready, Future, TryFuture};
 use futures_util::FutureExt;
 use pin_project::pin_project;
 use thiserror::Error;
-use tokio::{task::JoinHandle, sync::{watch::{Sender, Receiver}, self}};
+use tokio::{
+    sync::{
+        self,
+        watch::{Receiver, Sender},
+    },
+    task::JoinHandle,
+};
 use tokio_util::sync::ReusableBoxFuture;
-use tower::{ServiceExt, buffer::Buffer};
+use tower::{buffer::Buffer, ServiceExt};
 use tower_service::Service;
 
 use crate::StdError;
 
 use super::clone_body::CloneBody;
- 
+
 enum Inner<S> {
     Invalid,
     Ready(S),
     Pending(JoinHandle<Result<S, (S, StdError)>>),
-} 
+}
 
 #[derive(Debug, Error)]
 #[error("the inner service has not got ready yet!")]
 struct InnerServiceNotReadyErr;
 
-
-
 #[pin_project(project = InnerServiceCallingResponseProj)]
 enum InnerServiceCallingResponse<Fut> {
     Call(#[pin] Fut),
-    Fail
+    Fail,
 }
 
-impl<Fut> Future for InnerServiceCallingResponse<Fut> 
+impl<Fut> Future for InnerServiceCallingResponse<Fut>
 where
     Fut: TryFuture,
-    Fut::Error: Into<StdError>
+    Fut::Error: Into<StdError>,
 {
     type Output = Result<Fut::Ok, StdError>;
- 
+
     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> 
Poll<Self::Output> {
-       match self.project() {
-        InnerServiceCallingResponseProj::Call(call) => 
call.try_poll(cx).map_err(Into::into),
-        InnerServiceCallingResponseProj::Fail => 
Poll::Ready(Err(InnerServiceNotReadyErr.into()))
-       }
+        match self.project() {
+            InnerServiceCallingResponseProj::Call(call) => 
call.try_poll(cx).map_err(Into::into),
+            InnerServiceCallingResponseProj::Fail => {
+                Poll::Ready(Err(InnerServiceNotReadyErr.into()))
+            }
+        }
     }
 }
- 
+
 #[derive(Clone)]
 enum ObserveState {
     Ready,
     Pending,
 }
 
-
 struct ReadyService<S> {
     inner: Inner<S>,
-    tx: Sender<ObserveState>
+    tx: Sender<ObserveState>,
 }
 
-
 impl<S> ReadyService<S> {
-
     fn new(inner: S) -> (Self, Receiver<ObserveState>) {
         let (tx, rx) = sync::watch::channel(ObserveState::Ready);
-        let ready_service = Self { inner: Inner::Ready(inner), tx};
+        let ready_service = Self {
+            inner: Inner::Ready(inner),
+            tx,
+        };
         (ready_service, rx)
     }
-
 }
 
-impl<S, Req> Service<Req> for ReadyService<S> 
+impl<S, Req> Service<Req> for ReadyService<S>
 where
     S: Service<Req> + Send + 'static,
     <S as Service<Req>>::Error: Into<StdError>,
@@ -85,16 +90,14 @@ where
         loop {
             match mem::replace(&mut self.inner, Inner::Invalid) {
                 Inner::Ready(mut svc) => {
-                    let poll_ready  = svc.poll_ready(cx);
+                    let poll_ready = svc.poll_ready(cx);
                     match poll_ready {
                         Poll::Pending => {
                             self.inner = Inner::Pending(tokio::spawn(async 
move {
                                 let poll_ready = svc.ready().await;
                                 match poll_ready {
                                     Ok(_) => Ok(svc),
-                                    Err(err) => {
-                                        Err((svc, err.into()))
-                                    }
+                                    Err(err) => Err((svc, err.into())),
                                 }
                             }));
 
@@ -108,17 +111,17 @@ where
                             return Poll::Ready(ret.map_err(Into::into));
                         }
                     }
-                },
+                }
                 Inner::Pending(mut join_handle) => {
                     if let Poll::Ready(res) = join_handle.poll_unpin(cx) {
                         let (svc, res) = match res {
                             Err(join_err) => panic!("ReadyService panicked: 
{join_err}"),
                             Ok(Err((svc, err))) => (svc, 
Poll::Ready(Err(err))),
-                            Ok(Ok(svc)) => (svc, Poll::Ready(Ok(())))
+                            Ok(Ok(svc)) => (svc, Poll::Ready(Ok(()))),
                         };
 
                         self.inner = Inner::Ready(svc);
-                        
+
                         let _ = self.tx.send(ObserveState::Ready);
                         return res;
                     } else {
@@ -127,31 +130,29 @@ where
                         let _ = self.tx.send(ObserveState::Pending);
                         return Poll::Pending;
                     }
-                    
-                },
-                Inner::Invalid => panic!("ReadyService panicked: inner state 
is invalid")
+                }
+                Inner::Invalid => panic!("ReadyService panicked: inner state 
is invalid"),
             }
         }
     }
 
     fn call(&mut self, req: Req) -> Self::Future {
-       match self.inner {
-        Inner::Ready(ref mut svc) => 
InnerServiceCallingResponse::Call(svc.call(req)),
-        _ => InnerServiceCallingResponse::Fail
-       }
+        match self.inner {
+            Inner::Ready(ref mut svc) => 
InnerServiceCallingResponse::Call(svc.call(req)),
+            _ => InnerServiceCallingResponse::Fail,
+        }
     }
 }
 
-
 impl<S> Drop for ReadyService<S> {
     fn drop(&mut self) {
-       if let Inner::Pending(ref handler) = self.inner {
+        if let Inner::Pending(ref handler) = self.inner {
             handler.abort();
-       }
+        }
     }
 }
 
-pub struct CloneInvoker<Inv> 
+pub struct CloneInvoker<Inv>
 where
     Inv: Service<http::Request<CloneBody>> + Send + 'static,
     Inv::Error: Into<StdError> + Send + Sync + 'static,
@@ -163,40 +164,44 @@ where
     polling: bool,
 }
 
-impl<Inv> CloneInvoker<Inv> 
+impl<Inv> CloneInvoker<Inv>
 where
     Inv: Service<http::Request<CloneBody>> + Send + 'static,
     Inv::Error: Into<StdError> + Send + Sync + 'static,
     Inv::Future: Send,
 {
-
     const MAX_INVOKER_BUFFER_SIZE: usize = 16;
-    
+
     pub fn new(invoker: Inv) -> Self {
-        
         let (ready_service, rx) = ReadyService::new(invoker);
 
-        let buffer: Buffer<ReadyService<Inv>, http::Request<CloneBody>> = 
Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE);
+        let buffer: Buffer<ReadyService<Inv>, http::Request<CloneBody>> =
+            Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE);
 
-        Self { inner: buffer, rx, polling: false, poll: 
ReusableBoxFuture::new(futures::future::pending()) }
+        Self {
+            inner: buffer,
+            rx,
+            polling: false,
+            poll: ReusableBoxFuture::new(futures::future::pending()),
+        }
     }
 }
 
-impl<Inv> Service<http::Request<CloneBody>> for CloneInvoker<Inv> 
+impl<Inv> Service<http::Request<CloneBody>> for CloneInvoker<Inv>
 where
     Inv: Service<http::Request<CloneBody>> + Send + 'static,
     Inv::Error: Into<StdError> + Send + Sync + 'static,
     Inv::Future: Send,
 {
     type Response = Inv::Response;
- 
+
     type Error = StdError;
- 
+
     type Future = BoxFuture<'static, Result<Self::Response, StdError>>;
 
     fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> 
Poll<Result<(), Self::Error>> {
         loop {
-            if !self.polling { 
+            if !self.polling {
                 match self.rx.borrow().clone() {
                     ObserveState::Ready => return self.inner.poll_ready(cx),
                     ObserveState::Pending => {
@@ -206,7 +211,7 @@ where
                             loop {
                                 let current_state = 
rx.borrow_and_update().clone();
                                 if matches!(current_state, 
ObserveState::Ready) {
-                                    return current_state;                      
      
+                                    return current_state;
                                 }
                                 if let Err(_) = rx.changed().await {
                                     debug!("the readyService has already 
shutdown!");
@@ -216,7 +221,6 @@ where
                         });
                     }
                 }
-    
             }
 
             let state = ready!(self.poll.poll_unpin(cx));
@@ -235,14 +239,18 @@ where
     }
 }
 
-
-impl<Inv> Clone for CloneInvoker<Inv> 
+impl<Inv> Clone for CloneInvoker<Inv>
 where
     Inv: Service<http::Request<CloneBody>> + Send + 'static,
     Inv::Error: Into<StdError> + Send + Sync + 'static,
     Inv::Future: Send,
 {
     fn clone(&self) -> Self {
-        Self { inner: self.inner.clone(), rx: self.rx.clone(), polling: false, 
poll: ReusableBoxFuture::new(futures::future::pending())}
+        Self {
+            inner: self.inner.clone(),
+            rx: self.rx.clone(),
+            polling: false,
+            poll: ReusableBoxFuture::new(futures::future::pending()),
+        }
     }
-}  
\ No newline at end of file
+}
diff --git a/dubbo/src/invoker/mod.rs b/dubbo/src/invoker/mod.rs
index a8179ee..92b8b46 100644
--- a/dubbo/src/invoker/mod.rs
+++ b/dubbo/src/invoker/mod.rs
@@ -1,14 +1,12 @@
 use dubbo_base::Url;
 
-use crate::{codegen::TripleInvoker, svc::NewService, 
invoker::clone_invoker::CloneInvoker};
+use crate::{codegen::TripleInvoker, invoker::clone_invoker::CloneInvoker, 
svc::NewService};
 
 pub mod clone_body;
 pub mod clone_invoker;
 
-
 pub struct NewInvoker;
 
-
 impl NewService<String> for NewInvoker {
     type Service = CloneInvoker<TripleInvoker>;
 
@@ -18,4 +16,4 @@ impl NewService<String> for NewInvoker {
         let url = Url::from_url(&url).unwrap();
         CloneInvoker::new(TripleInvoker::new(url))
     }
-}
\ No newline at end of file
+}
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index 252e01c..d397b42 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -18,20 +18,20 @@
 pub mod cluster;
 pub mod codegen;
 pub mod context;
+pub mod directory;
 pub mod filter;
 mod framework;
 pub mod invocation;
+pub mod invoker;
+pub mod loadbalancer;
+pub mod param;
 pub mod protocol;
 pub mod registry;
+pub mod route;
 pub mod status;
+pub mod svc;
 pub mod triple;
 pub mod utils;
-pub mod directory;
-pub mod route;
-pub mod loadbalancer; 
-pub mod invoker;
-pub mod param;
-pub mod svc;
 
 use http_body::Body;
 use std::{future::Future, pin::Pin};
diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs
index 334350e..4e26781 100644
--- a/dubbo/src/loadbalancer/mod.rs
+++ b/dubbo/src/loadbalancer/mod.rs
@@ -1,16 +1,20 @@
 use futures_core::future::BoxFuture;
-use tower::ServiceExt;
-use tower::discover::ServiceList;
+use tower::{discover::ServiceList, ServiceExt};
 use tower_service::Service;
 
-use crate::invoker::clone_body::CloneBody;
-use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param, 
invoker::clone_invoker::CloneInvoker};
+use crate::{
+    codegen::RpcInvocation,
+    invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker},
+    param::Param,
+    svc::NewService,
+    StdError,
+};
 
 use crate::protocol::triple::triple_invoker::TripleInvoker;
- 
+
 pub struct NewLoadBalancer<N> {
     inner: N,
-} 
+}
 
 #[derive(Clone)]
 pub struct LoadBalancer<S> {
@@ -18,80 +22,74 @@ pub struct LoadBalancer<S> {
 }
 
 impl<N> NewLoadBalancer<N> {
-
-    pub fn layer() -> impl tower_layer::Layer<N, Service = Self>{
-
+    pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
         tower_layer::layer_fn(|inner| {
-
             NewLoadBalancer {
-                inner // NewRoutes
+                inner, // NewRoutes
             }
         })
     }
 }
 
-impl<N, T> NewService<T> for NewLoadBalancer<N> 
+impl<N, T> NewService<T> for NewLoadBalancer<N>
 where
     T: Param<RpcInvocation> + Clone,
     // NewRoutes
-    N: NewService<T>, 
+    N: NewService<T>,
 {
-
     type Service = LoadBalancer<N::Service>;
 
     fn new_service(&self, target: T) -> Self::Service {
         // Routes service
         let svc = self.inner.new_service(target);
 
-        LoadBalancer {
-            inner: svc,
-        }
-        
+        LoadBalancer { inner: svc }
     }
 }
 
-impl<N>  Service<http::Request<CloneBody>> for LoadBalancer<N> 
+impl<N> Service<http::Request<CloneBody>> for LoadBalancer<N>
 where
     // Routes service
     N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Clone,
     N::Error: Into<StdError> + Send,
-    N::Future: Send + 'static, 
+    N::Future: Send + 'static,
 {
-    
     type Response = <CloneInvoker<TripleInvoker> as 
Service<http::Request<CloneBody>>>::Response;
 
     type Error = StdError;
 
     type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
 
-    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> 
std::task::Poll<Result<(), Self::Error>> {
+    fn poll_ready(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), Self::Error>> {
         self.inner.poll_ready(cx).map_err(Into::into)
-    
     }
 
     fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
-        let routes  = self.inner.call(());
+        let routes = self.inner.call(());
 
         let fut = async move {
             let routes = routes.await;
 
             let routes: Vec<CloneInvoker<TripleInvoker>> = match routes {
                 Err(e) => return Err(Into::<StdError>::into(e)),
-                Ok(routes) => routes
+                Ok(routes) => routes,
             };
-  
-            let service_list: Vec<_> = routes.into_iter().map(|invoker| {      
      
-                tower::load::Constant::new(invoker, 1)
-            }).collect();
+
+            let service_list: Vec<_> = routes
+                .into_iter()
+                .map(|invoker| tower::load::Constant::new(invoker, 1))
+                .collect();
 
             let service_list = ServiceList::new(service_list);
-            
+
             let p2c = tower::balance::p2c::Balance::new(service_list);
- 
 
             p2c.oneshot(req).await
         };
- 
+
         Box::pin(fut)
     }
 }
diff --git a/dubbo/src/param.rs b/dubbo/src/param.rs
index b57f98e..bef5041 100644
--- a/dubbo/src/param.rs
+++ b/dubbo/src/param.rs
@@ -1,12 +1,9 @@
 pub trait Param<T> {
-    
     fn param(&self) -> T;
 }
 
-
 impl<T: ToOwned> Param<T::Owned> for T {
-
     fn param(&self) -> T::Owned {
-       self.to_owned()
+        self.to_owned()
     }
 }
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 9d35a52..7dbf1f3 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-use std::{
-    task::{Context, Poll},
-};
+use std::task::{Context, Poll};
 
 use async_trait::async_trait;
 use aws_smithy_http::body::SdkBody;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs 
b/dubbo/src/protocol/triple/triple_invoker.rs
index 71eae90..c8451e3 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -16,14 +16,17 @@
  */
 
 use dubbo_base::Url;
-use http::{Uri, HeaderValue};
+use http::{HeaderValue, Uri};
 use std::{
     fmt::{Debug, Formatter},
     str::FromStr,
 };
 use tower_service::Service;
 
-use crate::{triple::transport::{connection::Connection, self}, 
invoker::clone_body::CloneBody};
+use crate::{
+    invoker::clone_body::CloneBody,
+    triple::transport::{self, connection::Connection},
+};
 
 pub struct TripleInvoker {
     url: Url,
@@ -47,18 +50,19 @@ impl Debug for TripleInvoker {
 }
 
 impl TripleInvoker {
-    pub fn map_request(
-        &self,
-        req: http::Request<CloneBody>,
-    ) -> http::Request<CloneBody> {
-       
+    pub fn map_request(&self, req: http::Request<CloneBody>) -> 
http::Request<CloneBody> {
         let (parts, body) = req.into_parts();
 
         let path_and_query = 
parts.headers.get("path").unwrap().to_str().unwrap();
 
-        let authority  = self.url.clone().get_ip_port();
-        
-        let uri = 
Uri::builder().scheme("http").authority(authority).path_and_query(path_and_query).build().unwrap();
+        let authority = self.url.clone().get_ip_port();
+
+        let uri = Uri::builder()
+            .scheme("http")
+            .authority(authority)
+            .path_and_query(path_and_query)
+            .build()
+            .unwrap();
 
         let mut req = hyper::Request::builder()
             .version(http::Version::HTTP_2)
@@ -99,12 +103,12 @@ impl TripleInvoker {
             HeaderValue::from_static("dubbo-rust/0.1.0"),
         );
         // if let Some(_encoding) = self.send_compression_encoding {
-            
+
         // }
 
         req.headers_mut()
-                .insert("grpc-encoding", 
http::HeaderValue::from_static("gzip"));
-            
+            .insert("grpc-encoding", http::HeaderValue::from_static("gzip"));
+
         req.headers_mut().insert(
             "grpc-accept-encoding",
             http::HeaderValue::from_static("gzip"),
@@ -137,7 +141,10 @@ impl Service<http::Request<CloneBody>> for TripleInvoker {
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Result<(), Self::Error>> {
-        <transport::connection::Connection as 
Service<http::Request<CloneBody>>>::poll_ready(&mut self.conn, cx)
+        <transport::connection::Connection as 
Service<http::Request<CloneBody>>>::poll_ready(
+            &mut self.conn,
+            cx,
+        )
     }
 
     fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
@@ -146,4 +153,3 @@ impl Service<http::Request<CloneBody>> for TripleInvoker {
         self.conn.call(req)
     }
 }
-
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index ab2297f..d8ff6ef 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -18,9 +18,9 @@
 #![allow(unused_variables, dead_code, missing_docs)]
 pub mod integration;
 pub mod memory_registry;
+pub mod n_registry;
 pub mod protocol;
 pub mod types;
-pub mod n_registry;
 
 use std::{
     fmt::{Debug, Formatter},
@@ -29,7 +29,6 @@ use std::{
 
 use dubbo_base::Url;
 
-
 pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 
'static>;
 pub trait Registry {
     fn register(&mut self, url: Url) -> Result<(), crate::StdError>;
diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs
index 9b6dca5..928c9b3 100644
--- a/dubbo/src/registry/n_registry.rs
+++ b/dubbo/src/registry/n_registry.rs
@@ -2,19 +2,17 @@ use std::sync::Arc;
 
 use async_trait::async_trait;
 use dubbo_base::Url;
-use tokio::sync::mpsc::{Receiver, channel};
+use tokio::sync::mpsc::{channel, Receiver};
 use tower::discover::Change;
 
-
 use crate::StdError;
 
 type DiscoverStream = Receiver<Result<Change<String, ()>, StdError>>;
 
 #[async_trait]
 pub trait Registry {
-
     async fn register(&self, url: Url) -> Result<(), StdError>;
-    
+
     async fn unregister(&self, url: Url) -> Result<(), StdError>;
 
     // todo service_name change to url
@@ -25,31 +23,29 @@ pub trait Registry {
 
 #[derive(Clone)]
 pub struct ArcRegistry {
-    inner: Arc<dyn Registry + Send + Sync + 'static>
+    inner: Arc<dyn Registry + Send + Sync + 'static>,
 }
 
-
 pub enum RegistryComponent {
     NacosRegistry,
     ZookeeperRegistry,
     StaticRegistry(StaticRegistry),
 }
 
-
 pub struct StaticRegistry {
-    urls: Vec<Url>
+    urls: Vec<Url>,
 }
 
 impl ArcRegistry {
-
     pub fn new(registry: impl Registry + Send + Sync + 'static) -> Self {
-        Self { inner: Arc::new(registry) }
+        Self {
+            inner: Arc::new(registry),
+        }
     }
 }
 
 #[async_trait]
 impl Registry for ArcRegistry {
-    
     async fn register(&self, url: Url) -> Result<(), StdError> {
         self.inner.register(url).await
     }
@@ -67,9 +63,6 @@ impl Registry for ArcRegistry {
     }
 }
 
-
-
-
 #[async_trait]
 impl Registry for RegistryComponent {
     async fn register(&self, url: Url) -> Result<(), StdError> {
@@ -93,17 +86,12 @@ impl Registry for RegistryComponent {
     }
 }
 
-
 impl StaticRegistry {
-
     pub fn new(urls: Vec<Url>) -> Self {
-        Self {
-            urls
-        }
+        Self { urls }
     }
 }
 
-
 #[async_trait]
 impl Registry for StaticRegistry {
     async fn register(&self, url: Url) -> Result<(), StdError> {
@@ -119,7 +107,7 @@ impl Registry for StaticRegistry {
         for url in self.urls.iter() {
             let change = Ok(Change::Insert(url.to_url(), ()));
             tx.send(change).await?;
-        }      
+        }
 
         Ok(rx)
     }
@@ -127,4 +115,4 @@ impl Registry for StaticRegistry {
     async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
         todo!()
     }
-}
\ No newline at end of file
+}
diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs
index cff3cce..c244864 100644
--- a/dubbo/src/route/mod.rs
+++ b/dubbo/src/route/mod.rs
@@ -1,42 +1,51 @@
 use std::pin::Pin;
 
 use dubbo_logger::tracing::debug;
-use futures_core::{Future, ready};
+use futures_core::{ready, Future};
 use futures_util::{future::Ready, FutureExt, TryFutureExt};
-use tower::{util::FutureService, buffer::Buffer};
+use tower::{buffer::Buffer, util::FutureService};
 use tower_service::Service;
-  
-use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, 
svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker};
+
+use crate::{
+    codegen::{RpcInvocation, TripleInvoker},
+    invoker::clone_invoker::CloneInvoker,
+    param::Param,
+    svc::NewService,
+    StdError,
+};
 
 pub struct NewRoutes<N> {
     inner: N,
-} 
-
+}
 
 pub struct NewRoutesFuture<S, T> {
     inner: RoutesFutureInnerState<S>,
     target: T,
 }
 
-
 pub enum RoutesFutureInnerState<S> {
     Service(S),
-    Future(Pin<Box<dyn Future<Output = 
Result<Vec<CloneInvoker<TripleInvoker>>, StdError>> + Send + 'static>>),
+    Future(
+        Pin<
+            Box<
+                dyn Future<Output = Result<Vec<CloneInvoker<TripleInvoker>>, 
StdError>>
+                    + Send
+                    + 'static,
+            >,
+        >,
+    ),
     Ready(Vec<CloneInvoker<TripleInvoker>>),
 }
 
- 
 #[derive(Clone)]
 pub struct Routes<T> {
     target: T,
-    invokers: Vec<CloneInvoker<TripleInvoker>>
+    invokers: Vec<CloneInvoker<TripleInvoker>>,
 }
 
 impl<N> NewRoutes<N> {
     pub fn new(inner: N) -> Self {
-        Self {
-            inner, 
-        }
+        Self { inner }
     }
 }
 
@@ -44,40 +53,39 @@ impl<N> NewRoutes<N> {
     const MAX_ROUTE_BUFFER_SIZE: usize = 16;
 
     pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
-        tower_layer::layer_fn(|inner: N| {
-            NewRoutes::new(inner)
-        })
+        tower_layer::layer_fn(|inner: N| NewRoutes::new(inner))
     }
 }
 
-
-impl<N, T> NewService<T> for NewRoutes<N> 
+impl<N, T> NewService<T> for NewRoutes<N>
 where
-    T: Param<RpcInvocation> + Clone + Send + Unpin + 'static, 
+    T: Param<RpcInvocation> + Clone + Send + Unpin + 'static,
     // NewDirectory
     N: NewService<T>,
-    // Directory   
+    // Directory
     N::Service: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + 
Unpin + Send + 'static,
-    <N::Service as Service<()>>::Error: Into<StdError>, 
+    <N::Service as Service<()>>::Error: Into<StdError>,
     <N::Service as Service<()>>::Future: Send + 'static,
-{ 
-
-    type Service = Buffer<FutureService<NewRoutesFuture<<N as 
NewService<T>>::Service, T>, Routes<T>>, ()>;
+{
+    type Service =
+        Buffer<FutureService<NewRoutesFuture<<N as NewService<T>>::Service, 
T>, Routes<T>>, ()>;
 
     fn new_service(&self, target: T) -> Self::Service {
         let inner = self.inner.new_service(target.clone());
 
-        Buffer::new(FutureService::new(NewRoutesFuture {
-            inner: RoutesFutureInnerState::Service(inner),
-            target,
-        }), Self::MAX_ROUTE_BUFFER_SIZE)
+        Buffer::new(
+            FutureService::new(NewRoutesFuture {
+                inner: RoutesFutureInnerState::Service(inner),
+                target,
+            }),
+            Self::MAX_ROUTE_BUFFER_SIZE,
+        )
     }
 }
 
-
-impl<N, T> Future for NewRoutesFuture<N, T> 
+impl<N, T> Future for NewRoutesFuture<N, T>
 where
-    T: Param<RpcInvocation> + Clone + Unpin, 
+    T: Param<RpcInvocation> + Clone + Unpin,
     // Directory
     N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin,
     N::Error: Into<StdError>,
@@ -85,8 +93,10 @@ where
 {
     type Output = Result<Routes<T>, StdError>;
 
-    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) 
-> std::task::Poll<Self::Output> {
-
+    fn poll(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Self::Output> {
         let this = self.get_mut();
 
         loop {
@@ -94,14 +104,14 @@ where
                 RoutesFutureInnerState::Service(ref mut service) => {
                     debug!("RoutesFutureInnerState::Service");
                     let _ = 
ready!(service.poll_ready(cx)).map_err(Into::into)?;
-                    let fut = service.call(()).map_err(|e|e.into()).boxed();
+                    let fut = service.call(()).map_err(|e| e.into()).boxed();
                     this.inner = RoutesFutureInnerState::Future(fut);
-                },
+                }
                 RoutesFutureInnerState::Future(ref mut futures) => {
                     debug!("RoutesFutureInnerState::Future");
                     let invokers = ready!(futures.as_mut().poll(cx))?;
                     this.inner = RoutesFutureInnerState::Ready(invokers);
-                },
+                }
                 RoutesFutureInnerState::Ready(ref invokers) => {
                     debug!("RoutesFutureInnerState::Ready");
                     let target = this.target.clone();
@@ -109,32 +119,32 @@ where
                         invokers: invokers.clone(),
                         target,
                     }));
-                },
+                }
             }
         }
-        
     }
 }
 
-
-
-impl<T> Service<()> for Routes<T> 
+impl<T> Service<()> for Routes<T>
 where
-    T: Param<RpcInvocation> + Clone, 
-{ 
+    T: Param<RpcInvocation> + Clone,
+{
     type Response = Vec<CloneInvoker<TripleInvoker>>;
- 
+
     type Error = StdError;
-  
-    type Future = Ready<Result<Self::Response, Self::Error>>; 
 
-    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> 
std::task::Poll<Result<(), Self::Error>> {
+    type Future = Ready<Result<Self::Response, Self::Error>>;
+
+    fn poll_ready(
+        &mut self,
+        _: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), Self::Error>> {
         std::task::Poll::Ready(Ok(()))
     }
 
     fn call(&mut self, _: ()) -> Self::Future {
         // some router operator
         // if new_invokers changed, send new invokers to routes_rx after 
router operator
-        futures_util::future::ok(self.invokers.clone()) 
+        futures_util::future::ok(self.invokers.clone())
     }
-}
\ No newline at end of file
+}
diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs
index 56dc330..df4c071 100644
--- a/dubbo/src/svc.rs
+++ b/dubbo/src/svc.rs
@@ -1,23 +1,19 @@
-use std::{sync::Arc, pin::Pin, marker::PhantomData};
+use std::{marker::PhantomData, sync::Arc};
+
+
+
 
-use futures_core::Future;
-use tower::ServiceExt;
-use tower_service::Service;
 
 pub trait NewService<T> {
-    
     type Service;
 
     fn new_service(&self, target: T) -> Self::Service;
-
 }
 
-
 pub struct ArcNewService<T, S> {
     inner: Arc<dyn NewService<T, Service = S> + Send + Sync>,
 }
 
-
 impl<T, S> ArcNewService<T, S> {
     pub fn layer<N>() -> impl tower_layer::Layer<N, Service = Self> + Clone + 
Copy
     where
@@ -57,23 +53,19 @@ impl<T, S> NewService<T> for ArcNewService<T, S> {
 // inner: Box<dyn Service<T, Response = U, Error = E, Future = Pin<Box<dyn 
Future<Output = Result<U, E>> + Send>>> + Send>,
 pub struct BoxedService<N, R> {
     inner: N,
-    _mark: PhantomData<R>
+    _mark: PhantomData<R>,
 }
 
 impl<R, N> BoxedService<N, R> {
-
-    pub fn layer() -> impl tower_layer::Layer<N, Service = Self>{
-        tower_layer::layer_fn(|inner: N| {
-            Self {
-                inner,
-                _mark: PhantomData
-            }
+    pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
+        tower_layer::layer_fn(|inner: N| Self {
+            inner,
+            _mark: PhantomData,
         })
     }
 }
 
-
-// impl<T, R, N> NewService<T> for BoxedService<N, R> 
+// impl<T, R, N> NewService<T> for BoxedService<N, R>
 // where
 //     N: NewService<T>,
 //     N::Service: Service<R> + Send,
diff --git a/dubbo/src/triple/client/builder.rs 
b/dubbo/src/triple/client/builder.rs
index ed08021..c4e6e62 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -15,11 +15,15 @@
  * limitations under the License.
  */
 
-
 use std::sync::Arc;
 
 use crate::{
-    utils::boxed_clone::BoxCloneService, 
registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, 
route::NewRoutes, loadbalancer::NewLoadBalancer, cluster::{NewCluster, 
Cluster}, directory::NewCachedDirectory, svc::{ArcNewService, NewService, 
BoxedService}, StdError, codegen::RpcInvocation, BoxBody,
+    cluster::NewCluster,
+    directory::NewCachedDirectory,
+    loadbalancer::NewLoadBalancer,
+    registry::n_registry::{ArcRegistry, RegistryComponent, StaticRegistry},
+    route::NewRoutes,
+    utils::boxed_clone::BoxCloneService,
 };
 
 use aws_smithy_http::body::SdkBody;
@@ -29,7 +33,6 @@ use tower::ServiceBuilder;
 pub type ClientBoxService =
     BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, 
crate::Error>;
 
-
 pub type ServiceMK = 
Arc<NewCluster<NewLoadBalancer<NewRoutes<NewCachedDirectory<ArcRegistry>>>>>;
 
 #[derive(Default)]
@@ -56,7 +59,9 @@ impl ClientBuilder {
         Self {
             timeout: None,
             connector: "",
-            registry: 
Some(ArcRegistry::new(RegistryComponent::StaticRegistry(StaticRegistry::new(vec![Url::from_url(host).unwrap()])))),
+            registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(
+                StaticRegistry::new(vec![Url::from_url(host).unwrap()]),
+            ))),
             direct: true,
             host: host.to_string(),
         }
@@ -70,24 +75,23 @@ impl ClientBuilder {
     }
 
     pub fn with_registry(self, registry: RegistryComponent) -> Self {
-        Self { 
-            registry: Some(ArcRegistry::new(registry)), 
+        Self {
+            registry: Some(ArcRegistry::new(registry)),
             ..self
         }
     }
 
     pub fn with_host(self, host: &'static str) -> Self {
         Self {
-            registry: 
Some(ArcRegistry::new(RegistryComponent::StaticRegistry(StaticRegistry::new(vec![Url::from_url(host).unwrap()])))),
+            registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(
+                StaticRegistry::new(vec![Url::from_url(host).unwrap()]),
+            ))),
             ..self
         }
     }
 
     pub fn with_connector(self, connector: &'static str) -> Self {
-        Self {
-            connector,
-            ..self
-        }
+        Self { connector, ..self }
     }
 
     pub fn with_direct(self, direct: bool) -> Self {
@@ -95,16 +99,14 @@ impl ClientBuilder {
     }
 
     pub fn build(mut self) -> ServiceMK {
-  
-
         let registry = self.registry.take().expect("registry must not be 
empty");
 
         let mk_service = ServiceBuilder::new()
-                .layer(NewCluster::layer())
-                .layer(NewLoadBalancer::layer())
-                .layer(NewRoutes::layer())
-                .layer(NewCachedDirectory::layer())
-                .service(registry);
+            .layer(NewCluster::layer())
+            .layer(NewLoadBalancer::layer())
+            .layer(NewRoutes::layer())
+            .layer(NewCachedDirectory::layer())
+            .service(registry);
 
         Arc::new(mk_service)
     }
diff --git a/dubbo/src/triple/client/triple.rs 
b/dubbo/src/triple/client/triple.rs
index 9bcf144..377dc43 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-
 use futures_util::{future, stream, StreamExt, TryStreamExt};
 
 use aws_smithy_http::body::SdkBody;
@@ -25,9 +24,9 @@ use tower_service::Service;
 use super::builder::{ClientBuilder, ServiceMK};
 use crate::codegen::RpcInvocation;
 
-use crate::svc::NewService;
 use crate::{
     invocation::{IntoStreamingRequest, Metadata, Request, Response},
+    svc::NewService,
     triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, 
encode::encode},
 };
 
@@ -148,15 +147,12 @@ impl TripleClient {
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
 
-
         let mut invoker = self.mk.new_service(invocation);
 
-
         let request = http::Request::builder()
             .header("path", path.to_string())
-            .body(body).unwrap();
-
-       
+            .body(body)
+            .unwrap();
 
         let response = invoker
             .call(request)
@@ -211,14 +207,13 @@ impl TripleClient {
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
-      
-        let mut invoker = self.mk.new_service(invocation);
 
+        let mut invoker = self.mk.new_service(invocation);
 
         let request = http::Request::builder()
             .header("path", path.to_string())
-            .body(body).unwrap();
-
+            .body(body)
+            .unwrap();
 
         let response = invoker
             .call(request)
@@ -259,12 +254,10 @@ impl TripleClient {
         let body = hyper::Body::wrap_stream(en);
         let mut invoker = self.mk.new_service(invocation);
 
-
         let request = http::Request::builder()
             .header("path", path.to_string())
-            .body(body).unwrap();
-
-
+            .body(body)
+            .unwrap();
 
         // let mut conn = Connection::new().with_host(http_uri);
         let response = invoker
@@ -322,11 +315,10 @@ impl TripleClient {
         let body = hyper::Body::wrap_stream(en);
         let mut invoker = self.mk.new_service(invocation);
 
-
         let request = http::Request::builder()
             .header("path", path.to_string())
-            .body(body).unwrap();
-
+            .body(body)
+            .unwrap();
 
         let response = invoker
             .call(request)
diff --git a/dubbo/src/triple/transport/connection.rs 
b/dubbo/src/triple/transport/connection.rs
index 3bbaa17..cb0b9d7 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -18,9 +18,15 @@
 use hyper::client::{conn::Builder, service::Connect};
 use tower_service::Service;
 
-use crate::{boxed, triple::transport::connector::get_connector, StdError, 
invoker::clone_body::CloneBody};
+use crate::{
+    boxed, invoker::clone_body::CloneBody, 
triple::transport::connector::get_connector, StdError,
+};
 
-type HyperConnect = 
Connect<crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, 
StdError>, CloneBody, http::Uri>;
+type HyperConnect = Connect<
+    crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, 
StdError>,
+    CloneBody,
+    http::Uri,
+>;
 
 pub struct Connection {
     host: hyper::Uri,
@@ -65,13 +71,10 @@ impl Connection {
         let hyper_connect: HyperConnect = 
Connect::new(get_connector(self.connector), builder);
         self.connect = Some(hyper_connect);
         self
-
     }
 }
 
 impl Service<http::Request<CloneBody>> for Connection {
-
-
     type Response = http::Response<crate::BoxBody>;
 
     type Error = crate::Error;
@@ -85,30 +88,28 @@ impl Service<http::Request<CloneBody>> for Connection {
         match self.connect {
             None => {
                 panic!("connection must be built before use")
-            },
-            Some(ref mut connect) => {
-                connect.poll_ready(cx).map_err(|e|e.into())
             }
+            Some(ref mut connect) => connect.poll_ready(cx).map_err(|e| 
e.into()),
         }
     }
 
     fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
-      
         match self.connect {
             None => {
                 panic!("connection must be built before use")
-            },
+            }
             Some(ref mut connect) => {
                 let uri = self.host.clone();
                 let call_fut = connect.call(uri);
                 let fut = async move {
-                let mut con = call_fut.await.unwrap();
-                con.call(req).await
-                    .map_err(|err| err.into())
-                    .map(|res| res.map(boxed))
+                    let mut con = call_fut.await.unwrap();
+                    con.call(req)
+                        .await
+                        .map_err(|err| err.into())
+                        .map(|res| res.map(boxed))
                 };
 
-                return Box::pin(fut)
+                return Box::pin(fut);
             }
         }
     }

Reply via email to