This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch refact/cluster
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/refact/cluster by this push:
     new a0c7516  Tst: local test passed (#166)
a0c7516 is described below

commit a0c751699d01db8e8fab653243186451e00e18e9
Author: 毛文超 <ad...@onew.me>
AuthorDate: Thu Nov 23 10:46:37 2023 +0800

    Tst: local test passed (#166)
    
    * Tst: local test passed
    
    * Enhance: remove unnecessary key
    
    * Enhance: add BUFFER SIZE const variable
---
 dubbo-build/src/client.rs                         |   2 +-
 dubbo/src/cluster/mod.rs                          |  16 +-
 dubbo/src/directory/mod.rs                        | 148 ++++++-----------
 dubbo/src/{cluster => invoker}/clone_body.rs      |  50 ++----
 dubbo/src/{cluster => invoker}/clone_invoker.rs   |  30 ++--
 dubbo/src/invoker/mod.rs                          |  76 ++-------
 dubbo/src/loadbalancer/mod.rs                     |  37 ++---
 dubbo/src/protocol/triple/triple_invoker.rs       | 105 ++++++++++--
 dubbo/src/registry/n_registry.rs                  |  21 ++-
 dubbo/src/route/mod.rs                            | 192 ++++++++--------------
 dubbo/src/triple/client/builder.rs                |   6 +-
 dubbo/src/triple/client/triple.rs                 |   4 +
 dubbo/src/triple/transport/connection.rs          |  72 ++++----
 examples/echo/src/generated/grpc.examples.echo.rs |   2 +-
 examples/greeter/src/greeter/client.rs            |  12 +-
 examples/greeter/src/greeter/server.rs            |   8 +-
 16 files changed, 351 insertions(+), 430 deletions(-)

diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs
index 418bc10..b222f75 100644
--- a/dubbo-build/src/client.rs
+++ b/dubbo-build/src/client.rs
@@ -65,7 +65,7 @@ pub fn generate<T: Service>(
 
             #service_doc
             #(#struct_attributes)*
-            #[derive(Debug, Clone, Default)]
+            #[derive(Clone)]
             pub struct #service_ident {
                 inner: TripleClient,
             }
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 47394e2..0b1654a 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -19,12 +19,10 @@
 use http::Request;
 use tower_service::Service;
 
-use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param};
+use crate::{codegen::RpcInvocation, svc::NewService, param::Param, 
invoker::clone_body::CloneBody};
+
+use self::failover::Failover;
 
-use self::{failover::Failover, clone_body::CloneBody};
- 
-mod clone_body;
-mod clone_invoker;
 mod failover;
 
 pub struct NewCluster<N> {
@@ -65,11 +63,9 @@ where
     }
 }
   
-impl<S, B> Service<Request<B>> for Cluster<S> 
+impl<S> Service<Request<hyper::Body>> for Cluster<S> 
 where
-    S: Service<Request<CloneBody<B>>>,
-    B: http_body::Body + Unpin,
-    B::Error: Into<StdError>,
+    S: Service<Request<CloneBody>>,
 {
 
     type Response = S::Response;
@@ -82,7 +78,7 @@ where
         self.inner.poll_ready(cx)
     }
  
-    fn call(&mut self, req: Request<B>) -> Self::Future {
+    fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
         let (parts, body) = req.into_parts();
         let clone_body = CloneBody::new(body);
         let req = Request::from_parts(parts, clone_body); 
diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs
index 0861d32..a4cf466 100644
--- a/dubbo/src/directory/mod.rs
+++ b/dubbo/src/directory/mod.rs
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
- use core::panic;
 use std::{
-    hash::Hash,
-    task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex},
+    task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex}, pin::Pin,
 };
     
-use crate::{StdError, codegen::RpcInvocation, invocation::Invocation, 
registry::n_registry::Registry, invoker::NewInvoker, svc::NewService, 
param::Param};
-use futures_util::future::{poll_fn, self};
-use tokio::{sync::{watch, Notify, mpsc::channel}, select};
+use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, 
invocation::Invocation, registry::n_registry::Registry, 
invoker::{NewInvoker,clone_invoker::CloneInvoker}, svc::NewService, 
param::Param};
+use dubbo_logger::tracing::debug;
+use futures_core::ready;
+use futures_util::future;
+use tokio::sync::mpsc::channel;
 use tokio_stream::wrappers::ReceiverStream;
 use tower::{
     discover::{Change, Discover}, buffer::Buffer,
@@ -31,7 +31,7 @@ use tower::{
 
 use tower_service::Service;
 
-type BufferedDirectory = Buffer<Directory<ReceiverStream<Result<Change<String, 
NewInvoker>, StdError>>>, ()>;
+type BufferedDirectory = Buffer<Directory<ReceiverStream<Result<Change<String, 
()>, StdError>>>, ()>; 
 
 pub struct NewCachedDirectory<N>
 where
@@ -57,14 +57,10 @@ pub struct NewDirectory<N> {
 }
 
 
-
-#[derive(Clone)]
-pub struct Directory<D> 
-where
-    D: Discover
-{
-    rx: watch::Receiver<Vec<D::Service>>,
-    close: Arc<Notify>
+pub struct Directory<D> {
+    directory: HashMap<String, CloneInvoker<TripleInvoker>>,
+    discover: D,
+    new_invoker: NewInvoker,
 }
 
 
@@ -143,6 +139,8 @@ where
 
 impl<N> NewDirectory<N> {
 
+    const MAX_DIRECTORY_BUFFER_SIZE: usize = 16;
+
     pub fn new(inner: N) -> Self {
         NewDirectory {
             inner
@@ -150,6 +148,8 @@ impl<N> NewDirectory<N> {
     }
 } 
 
+
+
 impl<N, T> NewService<T> for NewDirectory<N> 
 where
     T: Param<RpcInvocation>,
@@ -157,6 +157,8 @@ where
     N: Registry + Clone + Send + Sync + 'static, 
 {
     type Service = BufferedDirectory; 
+
+    
  
     fn new_service(&self, target: T) -> Self::Service {
         
@@ -164,26 +166,27 @@ where
 
         let registry = self.inner.clone();
 
-        let (tx, rx) = channel(1024);
+        let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE);
 
         tokio::spawn(async move {
-
             let receiver = registry.subscribe(service_name).await;
+            debug!("discover start!");
             match receiver {
                 Err(e) => {
                     // error!("discover stream error: {}", e);
-                    
+                    debug!("discover stream error");
                 },
                 Ok(mut receiver) => {
                     loop {
                         let change = receiver.recv().await;
+                        debug!("receive change: {:?}", change); 
                         match change {
                             None => {
-                                // debug!("discover stream closed.");
+                                debug!("discover stream closed.");
                                 break;
                             },
                             Some(change) => {
-                                let _  = tx.send(change);
+                                let _  = tx.send(change).await;
                             }
                         }
                     }
@@ -192,71 +195,20 @@ where
 
         }); 
 
-        Buffer::new(Directory::new(ReceiverStream::new(rx)), 1024)
+        Buffer::new(Directory::new(ReceiverStream::new(rx)), 
Self::MAX_DIRECTORY_BUFFER_SIZE)
     } 
 
 } 
 
 
-impl<D> Directory<D> 
-where
-    // Discover
-    D: Discover + Unpin + Send + 'static,
-    // the key may be dubbo url
-    D::Key: Hash + Eq + Clone + Send,
-    // invoker new service
-    D::Service: NewService<()> + Clone + Send + Sync,
-{ 
+impl<D> Directory<D> { 
 
     pub fn new(discover: D) -> Self {
   
-        let mut discover = Box::pin(discover);
-
-        let (tx, rx) = watch::channel(Vec::new());
-        let close = Arc::new(Notify::new());
-        let close_clone = close.clone();
-     
-        tokio::spawn(async move {
-            let mut cache: HashMap<D::Key, D::Service> = HashMap::new();
-
-            loop {
-                let changed = select! {
-                    _ = close_clone.notified() => {
-                        // info!("discover stream closed.")
-                        return; 
-                    },
-                    changed = poll_fn(|cx| 
discover.as_mut().poll_discover(cx)) => {
-                        changed
-                    }
-                };
-                let Some(changed) = changed else {
-                    // debug!("discover stream closed.");
-                    break;
-                };
-
-                match changed {
-                    Err(e) => {
-                        // error!("discover stream error: {}", e);
-                        continue;
-                    },
-                    Ok(changed) => match changed {
-                        Change::Insert(k, v) => {
-                            cache.insert(k, v);
-                        },
-                        Change::Remove(k) => {
-                            cache.remove(&k);
-                        }
-                    }
-                }
-
-                let vec: Vec<D::Service> = 
cache.values().map(|v|v.clone()).collect();
-                let _ = tx.send(vec);
-            }
-            
-        });
         Directory {
-            rx,
-            close
+            directory: Default::default(),
+            discover,
+            new_invoker: NewInvoker,
         }
     }
 }
@@ -265,34 +217,40 @@ where
 impl<D> Service<()> for Directory<D> 
 where
     // Discover
-    D: Discover + Unpin + Send,
-    // the key may be dubbo url
-    D::Key: Hash + Eq + Clone + Send,
-    // invoker new service
-    D::Service: NewService<()> + Clone + Send + Sync,
+    D: Discover<Key = String> + Unpin + Send, 
+    D::Error: Into<StdError> 
 { 
-    type Response = watch::Receiver<Vec<D::Service>>;
+    type Response = Vec<CloneInvoker<TripleInvoker>>;
 
     type Error = StdError;
 
     type Future = future::Ready<Result<Self::Response, Self::Error>>;
 
     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), 
Self::Error>> {
-        Poll::Ready(Ok(()))
+        
+        loop {
+            let pin_discover = Pin::new(&mut self.discover);
+            let change = 
ready!(pin_discover.poll_discover(cx)).transpose().map_err(|e| e.into())?;
+            match change {
+                Some(Change::Remove(key)) => {
+                    debug!("remove key: {}", key);
+                    self.directory.remove(&key);
+                },  
+                Some(Change::Insert(key, _)) => {
+                    debug!("insert key: {}", key);
+                    let invoker = self.new_invoker.new_service(key.clone());
+                    self.directory.insert(key, invoker);
+                },
+                None => { 
+                    debug!("stream closed");
+                    return Poll::Ready(Ok(()));
+                }
+            }
+        }
     }
 
     fn call(&mut self, _: ()) -> Self::Future {
-        future::ok(self.rx.clone())
-    }
-}
-
-impl<D> Drop for Directory<D> 
-where
-    D: Discover, 
-{ 
-    fn drop(&mut self) {
-        if Arc::strong_count(&self.close) == 1 {
-            self.close.notify_one();
-        }
+        let vec = 
self.directory.values().map(|val|val.clone()).collect::<Vec<CloneInvoker<TripleInvoker>>>();
+        future::ok(vec)
     }
 }
\ No newline at end of file
diff --git a/dubbo/src/cluster/clone_body.rs b/dubbo/src/invoker/clone_body.rs
similarity index 91%
rename from dubbo/src/cluster/clone_body.rs
rename to dubbo/src/invoker/clone_body.rs
index 1d6b65a..5ce2e1f 100644
--- a/dubbo/src/cluster/clone_body.rs
+++ b/dubbo/src/invoker/clone_body.rs
@@ -14,32 +14,31 @@ use pin_project::pin_project;
 use thiserror::Error;
 
 use crate::StdError;
-
  
 #[derive(Error, Debug)]
 #[error("buffered body reach max capacity.")] 
 pub struct ReachMaxCapacityError;
 
-pub struct BufferedBody<B> {
-    shared:  Arc<Mutex<Option<OwnedBufferedBody<B>>>>,
-    owned: Option<OwnedBufferedBody<B>>,
+pub struct BufferedBody {
+    shared:  Arc<Mutex<Option<OwnedBufferedBody>>>,
+    owned: Option<OwnedBufferedBody>,
     replay_body: bool,
     replay_trailers: bool,
     is_empty: bool,
     size_hint: http_body::SizeHint,
 }
 
-pub struct OwnedBufferedBody<B> {
-    body: B,
+pub struct OwnedBufferedBody {
+    body: hyper::Body,
     trailers: Option<HeaderMap>,
     buf: InnerBuffer,
 }
 
 
 
-impl<B: http_body::Body> BufferedBody<B> {
+impl BufferedBody {
 
-    pub fn new(body: B, buf_size: usize) -> Self {
+    pub fn new(body: hyper::Body, buf_size: usize) -> Self {
         let size_hint = body.size_hint();
         let is_empty = body.is_end_stream();
         BufferedBody {
@@ -61,7 +60,7 @@ impl<B: http_body::Body> BufferedBody<B> {
 
 }
 
-impl<B> Clone for BufferedBody<B> {
+impl Clone for BufferedBody {
 
     fn clone(&self) -> Self {
         Self {
@@ -75,7 +74,7 @@ impl<B> Clone for BufferedBody<B> {
     }
 }
 
-impl<B> Drop for BufferedBody<B> {
+impl Drop for BufferedBody {
     fn drop(&mut self) {
         if let Some(owned) = self.owned.take() {
             let lock = self.shared.lock();
@@ -86,11 +85,8 @@ impl<B> Drop for BufferedBody<B> {
     }
 }
 
-impl<B> Body for BufferedBody<B> 
-where
-    B: http_body::Body + Unpin, 
-    B::Error: Into<StdError>,
-{
+impl Body for BufferedBody {
+
     type Data = BytesData;
     type Error = StdError;
 
@@ -328,24 +324,16 @@ impl Buf for BytesData {
 }
 
 #[pin_project]
-pub struct CloneBody<B>(#[pin] BufferedBody<B>);
-
-impl<B> CloneBody<B> 
-where
-    B: http_body::Body + Unpin, 
-    B::Error: Into<StdError>,
-{
-    pub fn new(inner_body: B) -> Self {
+pub struct CloneBody(#[pin] BufferedBody);
+
+impl CloneBody {
+    pub fn new(inner_body: hyper::Body) -> Self {
         let inner_body = BufferedBody::new(inner_body, 1024 * 64); 
         CloneBody(inner_body)
     }
 }
 
-impl<B> Body for CloneBody<B> 
-where
-    B: http_body::Body + Unpin, 
-    B::Error: Into<StdError>,
-{
+impl Body for CloneBody{
 
     type Data = BytesData;
 
@@ -371,11 +359,7 @@ where
 }
 
 
-impl<B> Clone for CloneBody<B> 
-where
-    B: http_body::Body + Unpin, 
-    B::Error: Into<StdError>,
-{
+impl Clone for CloneBody {
     fn clone(&self) -> Self {
         Self(self.0.clone())
     }
diff --git a/dubbo/src/cluster/clone_invoker.rs 
b/dubbo/src/invoker/clone_invoker.rs
similarity index 90%
rename from dubbo/src/cluster/clone_invoker.rs
rename to dubbo/src/invoker/clone_invoker.rs
index 20e1811..fe621b8 100644
--- a/dubbo/src/cluster/clone_invoker.rs
+++ b/dubbo/src/invoker/clone_invoker.rs
@@ -12,6 +12,8 @@ use tower_service::Service;
 
 use crate::StdError;
 
+use super::clone_body::CloneBody;
+ 
 enum Inner<S> {
     Invalid,
     Ready(S),
@@ -149,43 +151,42 @@ impl<S> Drop for ReadyService<S> {
     }
 }
 
-pub struct CloneInvoker<Inv, Req> 
+pub struct CloneInvoker<Inv> 
 where
-    Inv: Service<Req> + Send + 'static,
+    Inv: Service<http::Request<CloneBody>> + Send + 'static,
     Inv::Error: Into<StdError> + Send + Sync + 'static,
     Inv::Future: Send,
-    Req: Send
 {
-    inner: Buffer<ReadyService<Inv>, Req>,
+    inner: Buffer<ReadyService<Inv>, http::Request<CloneBody>>,
     rx: Receiver<ObserveState>,
     poll: ReusableBoxFuture<'static, ObserveState>,
     polling: bool,
 }
 
-impl<Inv, Req> CloneInvoker<Inv, Req> 
+impl<Inv> CloneInvoker<Inv> 
 where
-    Inv: Service<Req> + Send + 'static,
+    Inv: Service<http::Request<CloneBody>> + Send + 'static,
     Inv::Error: Into<StdError> + Send + Sync + 'static,
     Inv::Future: Send,
-    Req: Send + 'static
 {
 
+    const MAX_INVOKER_BUFFER_SIZE: usize = 16;
+    
     pub fn new(invoker: Inv) -> Self {
         
         let (ready_service, rx) = ReadyService::new(invoker);
 
-        let buffer: Buffer<ReadyService<Inv>, Req> = 
Buffer::new(ready_service, 1024);
+        let buffer: Buffer<ReadyService<Inv>, http::Request<CloneBody>> = 
Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE);
 
         Self { inner: buffer, rx, polling: false, poll: 
ReusableBoxFuture::new(futures::future::pending()) }
     }
 }
 
-impl<Inv, Req> Service<Req> for CloneInvoker<Inv, Req> 
+impl<Inv> Service<http::Request<CloneBody>> for CloneInvoker<Inv> 
 where
-    Inv: Service<Req> + Send + 'static,
+    Inv: Service<http::Request<CloneBody>> + Send + 'static,
     Inv::Error: Into<StdError> + Send + Sync + 'static,
     Inv::Future: Send,
-    Req: Send + 'static
 {
     type Response = Inv::Response;
  
@@ -229,18 +230,17 @@ where
         }
     }
 
-    fn call(&mut self, req: Req) -> Self::Future {
+    fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
         Box::pin(self.inner.call(req))
     }
 }
 
 
-impl<Inv, Req> Clone for CloneInvoker<Inv, Req> 
+impl<Inv> Clone for CloneInvoker<Inv> 
 where
-    Inv: Service<Req> + Send + 'static,
+    Inv: Service<http::Request<CloneBody>> + Send + 'static,
     Inv::Error: Into<StdError> + Send + Sync + 'static,
     Inv::Future: Send,
-    Req: Send
 {
     fn clone(&self) -> Self {
         Self { inner: self.inner.clone(), rx: self.rx.clone(), polling: false, 
poll: ReusableBoxFuture::new(futures::future::pending())}
diff --git a/dubbo/src/invoker/mod.rs b/dubbo/src/invoker/mod.rs
index 81bcb68..a8179ee 100644
--- a/dubbo/src/invoker/mod.rs
+++ b/dubbo/src/invoker/mod.rs
@@ -1,75 +1,21 @@
 use dubbo_base::Url;
-use tower_service::Service;
 
-use crate::{codegen::TripleInvoker, param::Param, svc::NewService};
+use crate::{codegen::TripleInvoker, svc::NewService, 
invoker::clone_invoker::CloneInvoker};
 
-#[derive(Clone)]
-pub struct NewInvoker {
-    url: Url
-}
+pub mod clone_body;
+pub mod clone_invoker;
 
-pub enum InvokerComponent {
-    TripleInvoker(TripleInvoker)
-}
 
+pub struct NewInvoker;
 
-impl NewInvoker {
-    pub fn new(url: Url) -> Self {
-        Self {
-            url
-        }
-    }
-}
-
-impl From<String> for NewInvoker {
-    fn from(url: String) -> Self {
-        Self {
-            url: Url::from_url(&url).unwrap()
-        }
-    }
-}
-
-impl Param<Url> for NewInvoker {
-    fn param(&self) -> Url {
-        self.url.clone()
-    }
-}
-
-impl NewService<()> for NewInvoker {
-    type Service = InvokerComponent;
-    fn new_service(&self, _: ()) -> Self::Service {
-        // todo create another invoker
-        InvokerComponent::TripleInvoker(TripleInvoker::new(self.url.clone()))
-    }
-}
-
-
-impl<B> Service<http::Request<B>> for InvokerComponent 
-where
-    B: http_body::Body + Unpin + Send + 'static,
-    B::Error: Into<crate::Error>,
-    B::Data: Send + Unpin,
-{
-    type Response = http::Response<crate::BoxBody>;
 
-    type Error = crate::Error;
+impl NewService<String> for NewInvoker {
+    type Service = CloneInvoker<TripleInvoker>;
 
-    type Future = crate::BoxFuture<Self::Response, Self::Error>;
+    fn new_service(&self, url: String) -> Self::Service {
+        // todo create another invoker by url protocol
 
-    fn call(&mut self, req: http::Request<B>) -> Self::Future {
-       match self {
-           InvokerComponent::TripleInvoker(invoker) => invoker.call(req),
-       }
+        let url = Url::from_url(&url).unwrap();
+        CloneInvoker::new(TripleInvoker::new(url))
     }
-
-    fn poll_ready(
-        &mut self,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), Self::Error>> {
-        match self {
-            InvokerComponent::TripleInvoker(invoker) => <TripleInvoker as 
Service<http::Request<B>>>::poll_ready(invoker, cx),
-        }
-    }
-}
-
-// InvokerComponent::TripleInvoker(invoker) => <TripleInvoker as 
Service<http::Request<B>>>::poll_ready(invoker, cx),
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs
index 822794c..334350e 100644
--- a/dubbo/src/loadbalancer/mod.rs
+++ b/dubbo/src/loadbalancer/mod.rs
@@ -1,8 +1,12 @@
 use futures_core::future::BoxFuture;
-use tower::{discover::ServiceList, ServiceExt};
+use tower::ServiceExt;
+use tower::discover::ServiceList;
 use tower_service::Service;
 
-use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param};
+use crate::invoker::clone_body::CloneBody;
+use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param, 
invoker::clone_invoker::CloneInvoker};
+
+use crate::protocol::triple::triple_invoker::TripleInvoker;
  
 pub struct NewLoadBalancer<N> {
     inner: N,
@@ -40,28 +44,21 @@ where
         let svc = self.inner.new_service(target);
 
         LoadBalancer {
-            inner: svc
+            inner: svc,
         }
         
     }
 }
 
-impl<N, Req, Nsv>  Service<Req> for LoadBalancer<N> 
+impl<N>  Service<http::Request<CloneBody>> for LoadBalancer<N> 
 where
-    Req: Send + 'static, 
     // Routes service
-    N: Service<(), Response = Vec<Nsv>> + Clone,
+    N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Clone,
     N::Error: Into<StdError> + Send,
     N::Future: Send + 'static, 
-    // new invoker
-    Nsv: NewService<()> + Send,
-    Nsv::Service: Service<Req> + Send,
-    // invoker
-    <Nsv::Service as Service<Req>>::Error: Into<StdError> + Send,
-    <Nsv::Service as Service<Req>>::Future: Send + 'static,
 {
     
-    type Response = <Nsv::Service as Service<Req>>::Response;
+    type Response = <CloneInvoker<TripleInvoker> as 
Service<http::Request<CloneBody>>>::Response;
 
     type Error = StdError;
 
@@ -69,30 +66,28 @@ where
 
     fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> 
std::task::Poll<Result<(), Self::Error>> {
         self.inner.poll_ready(cx).map_err(Into::into)
-        
+    
     }
 
-    fn call(&mut self, req: Req) -> Self::Future {
+    fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
         let routes  = self.inner.call(());
 
         let fut = async move {
             let routes = routes.await;
 
-            let routes: Vec<Nsv> = match routes {
+            let routes: Vec<CloneInvoker<TripleInvoker>> = match routes {
                 Err(e) => return Err(Into::<StdError>::into(e)),
                 Ok(routes) => routes
             };
-
-            let service_list: Vec<_> = routes.iter().map(|inv| {
-                let invoker = inv.new_service(());                
+  
+            let service_list: Vec<_> = routes.into_iter().map(|invoker| {      
      
                 tower::load::Constant::new(invoker, 1)
-                
             }).collect();
 
             let service_list = ServiceList::new(service_list);
             
             let p2c = tower::balance::p2c::Balance::new(service_list);
-
+ 
 
             p2c.oneshot(req).await
         };
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs 
b/dubbo/src/protocol/triple/triple_invoker.rs
index 685c434..71eae90 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -16,15 +16,15 @@
  */
 
 use dubbo_base::Url;
+use http::{Uri, HeaderValue};
 use std::{
     fmt::{Debug, Formatter},
     str::FromStr,
 };
 use tower_service::Service;
 
-use crate::triple::transport::{connection::Connection, self};
+use crate::{triple::transport::{connection::Connection, self}, 
invoker::clone_body::CloneBody};
 
-#[derive(Clone)]
 pub struct TripleInvoker {
     url: Url,
     conn: Connection,
@@ -35,7 +35,7 @@ impl TripleInvoker {
         let uri = http::Uri::from_str(&url.to_url()).unwrap();
         Self {
             url,
-            conn: Connection::new().with_host(uri),
+            conn: Connection::new().with_host(uri).build(),
         }
     }
 }
@@ -46,27 +46,104 @@ impl Debug for TripleInvoker {
     }
 }
 
-impl<B> Service<http::Request<B>> for TripleInvoker 
-where
-    B: http_body::Body + Unpin + Send + 'static,
-    B::Error: Into<crate::Error>,
-    B::Data: Send + Unpin,
-{
+impl TripleInvoker {
+    pub fn map_request(
+        &self,
+        req: http::Request<CloneBody>,
+    ) -> http::Request<CloneBody> {
+       
+        let (parts, body) = req.into_parts();
+
+        let path_and_query = 
parts.headers.get("path").unwrap().to_str().unwrap();
+
+        let authority  = self.url.clone().get_ip_port();
+        
+        let uri = 
Uri::builder().scheme("http").authority(authority).path_and_query(path_and_query).build().unwrap();
+
+        let mut req = hyper::Request::builder()
+            .version(http::Version::HTTP_2)
+            .uri(uri.clone())
+            .method("POST")
+            .body(body)
+            .unwrap();
+
+        // *req.version_mut() = http::Version::HTTP_2;
+        req.headers_mut()
+            .insert("method", HeaderValue::from_static("POST"));
+        req.headers_mut().insert(
+            "scheme",
+            HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(),
+        );
+        req.headers_mut()
+            .insert("path", HeaderValue::from_str(uri.path()).unwrap());
+        req.headers_mut().insert(
+            "authority",
+            HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
+        );
+        req.headers_mut().insert(
+            "content-type",
+            HeaderValue::from_static("application/grpc+proto"),
+        );
+        req.headers_mut()
+            .insert("user-agent", 
HeaderValue::from_static("dubbo-rust/0.1.0"));
+        req.headers_mut()
+            .insert("te", HeaderValue::from_static("trailers"));
+        req.headers_mut().insert(
+            "tri-service-version",
+            HeaderValue::from_static("dubbo-rust/0.1.0"),
+        );
+        req.headers_mut()
+            .insert("tri-service-group", HeaderValue::from_static("cluster"));
+        req.headers_mut().insert(
+            "tri-unit-info",
+            HeaderValue::from_static("dubbo-rust/0.1.0"),
+        );
+        // if let Some(_encoding) = self.send_compression_encoding {
+            
+        // }
+
+        req.headers_mut()
+                .insert("grpc-encoding", 
http::HeaderValue::from_static("gzip"));
+            
+        req.headers_mut().insert(
+            "grpc-accept-encoding",
+            http::HeaderValue::from_static("gzip"),
+        );
+
+        // // const (
+        // //     TripleContentType    = "application/grpc+proto"
+        // //     TripleUserAgent      = "grpc-go/1.35.0-dev"
+        // //     TripleServiceVersion = "tri-service-version"
+        // //     TripleAttachement    = "tri-attachment"
+        // //     TripleServiceGroup   = "tri-service-group"
+        // //     TripleRequestID      = "tri-req-id"
+        // //     TripleTraceID        = "tri-trace-traceid"
+        // //     TripleTraceRPCID     = "tri-trace-rpcid"
+        // //     TripleTraceProtoBin  = "tri-trace-proto-bin"
+        // //     TripleUnitInfo       = "tri-unit-info"
+        // // )
+        req
+    }
+}
+
+impl Service<http::Request<CloneBody>> for TripleInvoker {
     type Response = http::Response<crate::BoxBody>;
 
     type Error = crate::Error;
 
     type Future = crate::BoxFuture<Self::Response, Self::Error>;
 
-    fn call(&mut self, req: http::Request<B>) -> Self::Future {
-        self.conn.call(req)
-    }
-
     fn poll_ready(
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Result<(), Self::Error>> {
-        <transport::connection::Connection as 
Service<http::Request<B>>>::poll_ready(&mut self.conn, cx)
+        <transport::connection::Connection as 
Service<http::Request<CloneBody>>>::poll_ready(&mut self.conn, cx)
+    }
+
+    fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+        let req = self.map_request(req);
+
+        self.conn.call(req)
     }
 }
 
diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs
index 69300f2..9b6dca5 100644
--- a/dubbo/src/registry/n_registry.rs
+++ b/dubbo/src/registry/n_registry.rs
@@ -6,9 +6,9 @@ use tokio::sync::mpsc::{Receiver, channel};
 use tower::discover::Change;
 
 
-use crate::{StdError, invoker::NewInvoker};
+use crate::StdError;
 
-type DiscoverStream = Receiver<Result<Change<String, NewInvoker>, StdError>>;
+type DiscoverStream = Receiver<Result<Change<String, ()>, StdError>>;
 
 #[async_trait]
 pub trait Registry {
@@ -51,19 +51,19 @@ impl ArcRegistry {
 impl Registry for ArcRegistry {
     
     async fn register(&self, url: Url) -> Result<(), StdError> {
-        self.register(url).await
+        self.inner.register(url).await
     }
 
     async fn unregister(&self, url: Url) -> Result<(), StdError> {
-        self.unregister(url).await
+        self.inner.unregister(url).await
     }
 
     async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, 
StdError> {
-        self.subscribe(service_name).await
+        self.inner.subscribe(service_name).await
     }
 
     async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
-        self.unsubscribe(url).await
+        self.inner.unsubscribe(url).await
     }
 }
 
@@ -81,7 +81,11 @@ impl Registry for RegistryComponent {
     }
 
     async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, 
StdError> {
-        todo!()
+        match self {
+            RegistryComponent::NacosRegistry => todo!(),
+            RegistryComponent::ZookeeperRegistry => todo!(),
+            RegistryComponent::StaticRegistry(registry) => 
registry.subscribe(service_name).await,
+        }
     }
 
     async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
@@ -113,8 +117,7 @@ impl Registry for StaticRegistry {
     async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, 
StdError> {
         let (tx, rx) = channel(self.urls.len());
         for url in self.urls.iter() {
-            let invoker = NewInvoker::new(url.clone());
-            let change = Ok(Change::Insert(service_name.clone(), invoker));
+            let change = Ok(Change::Insert(url.to_url(), ()));
             tx.send(change).await?;
         }      
 
diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs
index dfbe73e..cff3cce 100644
--- a/dubbo/src/route/mod.rs
+++ b/dubbo/src/route/mod.rs
@@ -1,38 +1,35 @@
-use std::{sync::{Arc, Mutex}, collections::HashMap};
+use std::pin::Pin;
 
+use dubbo_logger::tracing::debug;
 use futures_core::{Future, ready};
-use futures_util::future::Ready;
-use pin_project::pin_project;
-use tokio::{sync::watch, pin};
+use futures_util::{future::Ready, FutureExt, TryFutureExt};
 use tower::{util::FutureService, buffer::Buffer};
 use tower_service::Service;
   
-use crate::{StdError, codegen::RpcInvocation, svc::NewService, param::Param, 
invocation::Invocation};
+use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, 
svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker};
 
 pub struct NewRoutes<N> {
     inner: N,
 } 
 
-pub struct NewRoutesCache<N> 
-where
-    N: NewService<RpcInvocation>
-{
-    inner: N,
-    cache: Arc<Mutex<HashMap<String, N::Service>>>,
-}
 
-#[pin_project]
-pub struct NewRoutesFuture<N, T> {
-    #[pin]
-    inner: N,
+pub struct NewRoutesFuture<S, T> {
+    inner: RoutesFutureInnerState<S>,
     target: T,
 }
+
+
+pub enum RoutesFutureInnerState<S> {
+    Service(S),
+    Future(Pin<Box<dyn Future<Output = 
Result<Vec<CloneInvoker<TripleInvoker>>, StdError>> + Send + 'static>>),
+    Ready(Vec<CloneInvoker<TripleInvoker>>),
+}
+
  
 #[derive(Clone)]
-pub struct Routes<Nsv, T> {
+pub struct Routes<T> {
     target: T,
-    new_invokers: Vec<Nsv>,
-    invokers_receiver: watch::Receiver<Vec<Nsv>>,
+    invokers: Vec<CloneInvoker<TripleInvoker>>
 }
 
 impl<N> NewRoutes<N> {
@@ -43,154 +40,101 @@ impl<N> NewRoutes<N> {
     }
 }
 
+impl<N> NewRoutes<N> {
+    const MAX_ROUTE_BUFFER_SIZE: usize = 16;
 
-impl<N, T, Nsv> NewService<T> for NewRoutes<N> 
-where
-    T: Param<RpcInvocation> + Clone + Send + 'static, 
-    // NewDirectory
-    N: NewService<T>,
-    // Directory
-    N::Service: Service<(), Response = watch::Receiver<Vec<Nsv>>> + Unpin + 
Send + 'static,
-    <N::Service as Service<()>>::Error: Into<StdError>,
-    // new invoker service
-    Nsv: NewService<()> + Clone + Send + Sync + 'static,
-{
-
-    type Service = Buffer<FutureService<NewRoutesFuture<<N as 
NewService<T>>::Service, T>, Routes<Nsv, T>>, ()>;
-
-    fn new_service(&self, target: T) -> Self::Service {
-        let inner = self.inner.new_service(target.clone());
-        
-        Buffer::new(FutureService::new(NewRoutesFuture {
-            inner,
-            target,
-        }), 1024)
-    }
-}
-impl<N, Nsv> NewRoutesCache<N> 
-where
-    N: NewService<RpcInvocation>,
-    <N as NewService<RpcInvocation>>::Service: Service<(), Response = 
watch::Receiver<Vec<Nsv>>> + Unpin + Send + 'static,
-    <N::Service as Service<()>>::Error: Into<StdError>,
-    Nsv: NewService<()> + Clone + Send + Sync + 'static,
-
-{
-    pub fn layer() -> impl tower_layer::Layer<N, Service = 
NewRoutesCache<NewRoutes<N>>> {
+    pub fn layer() -> impl tower_layer::Layer<N, Service = Self> {
         tower_layer::layer_fn(|inner: N| {
-            NewRoutesCache::new(NewRoutes::new(inner))
+            NewRoutes::new(inner)
         })
     }
-
-
 }
 
 
-impl<N> NewRoutesCache<N> 
+impl<N, T> NewService<T> for NewRoutes<N> 
 where
-    N: NewService<RpcInvocation>
-{
-    pub fn new(inner: N) -> Self {
-        Self {
-            inner,
-            cache: Default::default(),
-        }
-    }
-}
+    T: Param<RpcInvocation> + Clone + Send + Unpin + 'static, 
+    // NewDirectory
+    N: NewService<T>,
+    // Directory   
+    N::Service: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + 
Unpin + Send + 'static,
+    <N::Service as Service<()>>::Error: Into<StdError>, 
+    <N::Service as Service<()>>::Future: Send + 'static,
+{ 
 
-impl<N, T> NewService<T> for NewRoutesCache<N> 
-where
-    T: Param<RpcInvocation>,
-    N: NewService<RpcInvocation>,
-    N::Service: Clone,
-{
-    type Service = N::Service;
+    type Service = Buffer<FutureService<NewRoutesFuture<<N as 
NewService<T>>::Service, T>, Routes<T>>, ()>;
 
     fn new_service(&self, target: T) -> Self::Service {
-        let rpc_inv = target.param();
-        let service_name = rpc_inv.get_target_service_unique_name();
-
-        let mut cache = self.cache.lock().expect("RoutesCache get lock 
failed");
-
-        let service = cache.get(&service_name);
-        match service {
-            Some(service) => service.clone(),
-            None => {
-                let service = self.inner.new_service(rpc_inv);
-                cache.insert(service_name, service.clone());
-                service
-            }
-        }
+        let inner = self.inner.new_service(target.clone());
+
+        Buffer::new(FutureService::new(NewRoutesFuture {
+            inner: RoutesFutureInnerState::Service(inner),
+            target,
+        }), Self::MAX_ROUTE_BUFFER_SIZE)
     }
 }
 
 
-impl<N, T, Nsv> Future for NewRoutesFuture<N, T> 
+impl<N, T> Future for NewRoutesFuture<N, T> 
 where
-    T: Param<RpcInvocation> + Clone, 
+    T: Param<RpcInvocation> + Clone + Unpin, 
     // Directory
-    N: Service<(), Response = watch::Receiver<Vec<Nsv>>> + Unpin,
+    N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin,
     N::Error: Into<StdError>,
-     // new invoker service
-    Nsv: NewService<()> + Clone,
+    N::Future: Send + 'static,
 {
-    type Output = Result<Routes<Nsv, T>, StdError>;
+    type Output = Result<Routes<T>, StdError>;
 
     fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) 
-> std::task::Poll<Self::Output> {
 
         let this = self.get_mut();
 
-        let target = this.target.clone();
-
-        let _ = ready!(this.inner.poll_ready(cx)).map_err(Into::into)?;
-
-
-        let call = this.inner.call(());
-        pin!(call);
-    
-        let mut invokers_receiver = ready!(call.poll(cx).map_err(Into::into))?;
-        let new_invokers = {
-            let wait_for =  invokers_receiver.wait_for(|invs|!invs.is_empty());
-            pin!(wait_for);
-    
-            let changed = ready!(wait_for.poll(cx))?;
-    
-            changed.clone()
-        };
-         
-
-        std::task::Poll::Ready(Ok(Routes {
-            invokers_receiver,
-            new_invokers,
-            target,
-        }))
+        loop {
+            match this.inner {
+                RoutesFutureInnerState::Service(ref mut service) => {
+                    debug!("RoutesFutureInnerState::Service");
+                    let _ = 
ready!(service.poll_ready(cx)).map_err(Into::into)?;
+                    let fut = service.call(()).map_err(|e|e.into()).boxed();
+                    this.inner = RoutesFutureInnerState::Future(fut);
+                },
+                RoutesFutureInnerState::Future(ref mut futures) => {
+                    debug!("RoutesFutureInnerState::Future");
+                    let invokers = ready!(futures.as_mut().poll(cx))?;
+                    this.inner = RoutesFutureInnerState::Ready(invokers);
+                },
+                RoutesFutureInnerState::Ready(ref invokers) => {
+                    debug!("RoutesFutureInnerState::Ready");
+                    let target = this.target.clone();
+                    return std::task::Poll::Ready(Ok(Routes {
+                        invokers: invokers.clone(),
+                        target,
+                    }));
+                },
+            }
+        }
+        
     }
 }
 
 
 
-impl<Nsv,T> Service<()> for Routes<Nsv, T> 
+impl<T> Service<()> for Routes<T> 
 where
     T: Param<RpcInvocation> + Clone, 
-     // new invoker service
-    Nsv: NewService<()> + Clone,
 { 
-    type Response = Vec<Nsv>;
+    type Response = Vec<CloneInvoker<TripleInvoker>>;
  
     type Error = StdError;
   
     type Future = Ready<Result<Self::Response, Self::Error>>; 
 
     fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> 
std::task::Poll<Result<(), Self::Error>> {
-        let has_change = self.invokers_receiver.has_changed()?;
-        if has_change {
-            self.new_invokers = 
self.invokers_receiver.borrow_and_update().clone();
-        }
         std::task::Poll::Ready(Ok(()))
     }
 
     fn call(&mut self, _: ()) -> Self::Future {
         // some router operator
         // if new_invokers changed, send new invokers to routes_rx after 
router operator
-        futures_util::future::ok(self.new_invokers.clone()) 
+        futures_util::future::ok(self.invokers.clone()) 
     }
 }
\ No newline at end of file
diff --git a/dubbo/src/triple/client/builder.rs 
b/dubbo/src/triple/client/builder.rs
index 89d7a00..ed08021 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -19,7 +19,7 @@
 use std::sync::Arc;
 
 use crate::{
-    utils::boxed_clone::BoxCloneService, 
registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, 
route::{NewRoutes, NewRoutesCache}, loadbalancer::NewLoadBalancer, 
cluster::{NewCluster, Cluster}, directory::NewCachedDirectory, 
svc::{ArcNewService, NewService, BoxedService}, StdError, 
codegen::RpcInvocation, BoxBody,
+    utils::boxed_clone::BoxCloneService, 
registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, 
route::NewRoutes, loadbalancer::NewLoadBalancer, cluster::{NewCluster, 
Cluster}, directory::NewCachedDirectory, svc::{ArcNewService, NewService, 
BoxedService}, StdError, codegen::RpcInvocation, BoxBody,
 };
 
 use aws_smithy_http::body::SdkBody;
@@ -30,7 +30,7 @@ pub type ClientBoxService =
     BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, 
crate::Error>;
 
 
-pub type ServiceMK = 
Arc<NewCluster<NewLoadBalancer<NewRoutesCache<NewRoutes<NewCachedDirectory<ArcRegistry>>>>>>;
+pub type ServiceMK = 
Arc<NewCluster<NewLoadBalancer<NewRoutes<NewCachedDirectory<ArcRegistry>>>>>;
 
 #[derive(Default)]
 pub struct ClientBuilder {
@@ -102,7 +102,7 @@ impl ClientBuilder {
         let mk_service = ServiceBuilder::new()
                 .layer(NewCluster::layer())
                 .layer(NewLoadBalancer::layer())
-                .layer(NewRoutesCache::layer())
+                .layer(NewRoutes::layer())
                 .layer(NewCachedDirectory::layer())
                 .service(registry);
 
diff --git a/dubbo/src/triple/client/triple.rs 
b/dubbo/src/triple/client/triple.rs
index 091e020..9bcf144 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -153,6 +153,7 @@ impl TripleClient {
 
 
         let request = http::Request::builder()
+            .header("path", path.to_string())
             .body(body).unwrap();
 
        
@@ -215,6 +216,7 @@ impl TripleClient {
 
 
         let request = http::Request::builder()
+            .header("path", path.to_string())
             .body(body).unwrap();
 
 
@@ -259,6 +261,7 @@ impl TripleClient {
 
 
         let request = http::Request::builder()
+            .header("path", path.to_string())
             .body(body).unwrap();
 
 
@@ -321,6 +324,7 @@ impl TripleClient {
 
 
         let request = http::Request::builder()
+            .header("path", path.to_string())
             .body(body).unwrap();
 
 
diff --git a/dubbo/src/triple/transport/connection.rs 
b/dubbo/src/triple/transport/connection.rs
index 1b97875..3bbaa17 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-use std::task::Poll;
-
-use dubbo_logger::tracing::debug;
 use hyper::client::{conn::Builder, service::Connect};
 use tower_service::Service;
 
-use crate::{boxed, triple::transport::connector::get_connector};
+use crate::{boxed, triple::transport::connector::get_connector, StdError, 
invoker::clone_body::CloneBody};
+
+type HyperConnect = 
Connect<crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, 
StdError>, CloneBody, http::Uri>;
 
-#[derive(Debug, Clone)]
 pub struct Connection {
     host: hyper::Uri,
     connector: &'static str,
     builder: Builder,
+    connect: Option<HyperConnect>,
 }
 
 impl Default for Connection {
@@ -42,6 +41,7 @@ impl Connection {
             host: hyper::Uri::default(),
             connector: "http",
             builder: Builder::new(),
+            connect: None,
         }
     }
 
@@ -59,14 +59,19 @@ impl Connection {
         self.builder = builder;
         self
     }
+
+    pub fn build(mut self) -> Self {
+        let builder = self.builder.clone().http2_only(true).to_owned();
+        let hyper_connect: HyperConnect = 
Connect::new(get_connector(self.connector), builder);
+        self.connect = Some(hyper_connect);
+        self
+
+    }
 }
 
-impl<ReqBody> Service<http::Request<ReqBody>> for Connection
-where
-    ReqBody: http_body::Body + Unpin + Send + 'static,
-    ReqBody::Data: Send + Unpin,
-    ReqBody::Error: Into<crate::Error>,
-{
+impl Service<http::Request<CloneBody>> for Connection {
+
+
     type Response = http::Response<crate::BoxBody>;
 
     type Error = crate::Error;
@@ -75,25 +80,36 @@ where
 
     fn poll_ready(
         &mut self,
-        _cx: &mut std::task::Context<'_>,
+        cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
+        match self.connect {
+            None => {
+                panic!("connection must be built before use")
+            },
+            Some(ref mut connect) => {
+                connect.poll_ready(cx).map_err(|e|e.into())
+            }
+        }
     }
 
-    fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
-        let builder = self.builder.clone().http2_only(true).to_owned();
-        let mut connector = Connect::new(get_connector(self.connector), 
builder);
-        let uri = self.host.clone();
-        let fut = async move {
-            debug!("send base call to {}", uri);
-            let mut con = connector.call(uri).await.unwrap();
-
-            con.call(req)
-                .await
-                .map_err(|err| err.into())
-                .map(|res| res.map(boxed))
-        };
-
-        Box::pin(fut)
+    fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future {
+      
+        match self.connect {
+            None => {
+                panic!("connection must be built before use")
+            },
+            Some(ref mut connect) => {
+                let uri = self.host.clone();
+                let call_fut = connect.call(uri);
+                let fut = async move {
+                let mut con = call_fut.await.unwrap();
+                con.call(req).await
+                    .map_err(|err| err.into())
+                    .map(|res| res.map(boxed))
+                };
+
+                return Box::pin(fut)
+            }
+        }
     }
 }
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs 
b/examples/echo/src/generated/grpc.examples.echo.rs
index e756d5c..07c58fe 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -17,7 +17,7 @@ pub mod echo_client {
     #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
     use dubbo::codegen::*;
     /// Echo is the echo service.
-    #[derive(Debug, Clone, Default)]
+    #[derive(Clone)]
     pub struct EchoClient {
         inner: TripleClient,
     }
diff --git a/examples/greeter/src/greeter/client.rs 
b/examples/greeter/src/greeter/client.rs
index 1d59cdf..afa1b20 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -21,21 +21,17 @@ pub mod protos {
 }
 
 use std::env;
-
+ 
 use dubbo::codegen::*;
 
-use dubbo_base::Url;
 use futures_util::StreamExt;
 use protos::{greeter_client::GreeterClient, GreeterRequest};
-use registry_nacos::NacosRegistry;
-use registry_zookeeper::ZookeeperRegistry;
 
 #[tokio::main]
 async fn main() {
     dubbo_logger::init();
-
-    let mut builder = ClientBuilder::new();
-    builder.with_host("http://127.0.0.1:8888";);
+ 
+    let builder = ClientBuilder::new().with_host("http://127.0.0.1:8888";);
 
     let mut cli = GreeterClient::new(builder);
 
@@ -47,7 +43,7 @@ async fn main() {
         .await;
     let resp = match resp {
         Ok(resp) => resp,
-        Err(err) => return println!("{:?}", err),
+        Err(err) => return println!("response error: {:?}", err),
     };
     let (_parts, body) = resp.into_parts();
     println!("Response: {:?}", body);
diff --git a/examples/greeter/src/greeter/server.rs 
b/examples/greeter/src/greeter/server.rs
index 94e4e53..c3bc4c5 100644
--- a/examples/greeter/src/greeter/server.rs
+++ b/examples/greeter/src/greeter/server.rs
@@ -22,7 +22,7 @@ use futures_util::{Stream, StreamExt};
 use tokio::sync::mpsc;
 use tokio_stream::wrappers::ReceiverStream;
 
-use dubbo::{codegen::*, Dubbo};
+use dubbo::{codegen::*, Dubbo, registry::memory_registry::MemoryRegistry};
 use dubbo_config::RootConfig;
 use dubbo_logger::{
     tracing::{info, span},
@@ -50,7 +50,7 @@ async fn main() {
     register_server(GreeterServerImpl {
         name: "greeter".to_string(),
     });
-    let zkr = ZookeeperRegistry::default();
+    // let zkr: ZookeeperRegistry = ZookeeperRegistry::default();
     let r = RootConfig::new();
     let r = match r.load() {
         Ok(config) => config,
@@ -58,7 +58,9 @@ async fn main() {
     };
     let mut f = Dubbo::new()
         .with_config(r)
-        .add_registry("zookeeper", Box::new(zkr));
+        .add_registry("memory_registry", Box::new(MemoryRegistry::new()));
+
+
     f.start().await;
 }
 


Reply via email to