This is an automated email from the ASF dual-hosted git repository. yangyang pushed a commit to branch refact/cluster in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/refact/cluster by this push: new a0c7516 Tst: local test passed (#166) a0c7516 is described below commit a0c751699d01db8e8fab653243186451e00e18e9 Author: 毛文超 <ad...@onew.me> AuthorDate: Thu Nov 23 10:46:37 2023 +0800 Tst: local test passed (#166) * Tst: local test passed * Enhance: remove unnecessary key * Enhance: add BUFFER SIZE const variable --- dubbo-build/src/client.rs | 2 +- dubbo/src/cluster/mod.rs | 16 +- dubbo/src/directory/mod.rs | 148 ++++++----------- dubbo/src/{cluster => invoker}/clone_body.rs | 50 ++---- dubbo/src/{cluster => invoker}/clone_invoker.rs | 30 ++-- dubbo/src/invoker/mod.rs | 76 ++------- dubbo/src/loadbalancer/mod.rs | 37 ++--- dubbo/src/protocol/triple/triple_invoker.rs | 105 ++++++++++-- dubbo/src/registry/n_registry.rs | 21 ++- dubbo/src/route/mod.rs | 192 ++++++++-------------- dubbo/src/triple/client/builder.rs | 6 +- dubbo/src/triple/client/triple.rs | 4 + dubbo/src/triple/transport/connection.rs | 72 ++++---- examples/echo/src/generated/grpc.examples.echo.rs | 2 +- examples/greeter/src/greeter/client.rs | 12 +- examples/greeter/src/greeter/server.rs | 8 +- 16 files changed, 351 insertions(+), 430 deletions(-) diff --git a/dubbo-build/src/client.rs b/dubbo-build/src/client.rs index 418bc10..b222f75 100644 --- a/dubbo-build/src/client.rs +++ b/dubbo-build/src/client.rs @@ -65,7 +65,7 @@ pub fn generate<T: Service>( #service_doc #(#struct_attributes)* - #[derive(Debug, Clone, Default)] + #[derive(Clone)] pub struct #service_ident { inner: TripleClient, } diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index 47394e2..0b1654a 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -19,12 +19,10 @@ use http::Request; use tower_service::Service; -use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param}; +use crate::{codegen::RpcInvocation, svc::NewService, param::Param, invoker::clone_body::CloneBody}; + +use self::failover::Failover; -use self::{failover::Failover, clone_body::CloneBody}; - -mod clone_body; -mod clone_invoker; mod failover; pub struct NewCluster<N> { @@ -65,11 +63,9 @@ where } } -impl<S, B> Service<Request<B>> for Cluster<S> +impl<S> Service<Request<hyper::Body>> for Cluster<S> where - S: Service<Request<CloneBody<B>>>, - B: http_body::Body + Unpin, - B::Error: Into<StdError>, + S: Service<Request<CloneBody>>, { type Response = S::Response; @@ -82,7 +78,7 @@ where self.inner.poll_ready(cx) } - fn call(&mut self, req: Request<B>) -> Self::Future { + fn call(&mut self, req: Request<hyper::Body>) -> Self::Future { let (parts, body) = req.into_parts(); let clone_body = CloneBody::new(body); let req = Request::from_parts(parts, clone_body); diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs index 0861d32..a4cf466 100644 --- a/dubbo/src/directory/mod.rs +++ b/dubbo/src/directory/mod.rs @@ -15,15 +15,15 @@ * limitations under the License. */ - use core::panic; use std::{ - hash::Hash, - task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex}, + task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex}, pin::Pin, }; -use crate::{StdError, codegen::RpcInvocation, invocation::Invocation, registry::n_registry::Registry, invoker::NewInvoker, svc::NewService, param::Param}; -use futures_util::future::{poll_fn, self}; -use tokio::{sync::{watch, Notify, mpsc::channel}, select}; +use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, invocation::Invocation, registry::n_registry::Registry, invoker::{NewInvoker,clone_invoker::CloneInvoker}, svc::NewService, param::Param}; +use dubbo_logger::tracing::debug; +use futures_core::ready; +use futures_util::future; +use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; use tower::{ discover::{Change, Discover}, buffer::Buffer, @@ -31,7 +31,7 @@ use tower::{ use tower_service::Service; -type BufferedDirectory = Buffer<Directory<ReceiverStream<Result<Change<String, NewInvoker>, StdError>>>, ()>; +type BufferedDirectory = Buffer<Directory<ReceiverStream<Result<Change<String, ()>, StdError>>>, ()>; pub struct NewCachedDirectory<N> where @@ -57,14 +57,10 @@ pub struct NewDirectory<N> { } - -#[derive(Clone)] -pub struct Directory<D> -where - D: Discover -{ - rx: watch::Receiver<Vec<D::Service>>, - close: Arc<Notify> +pub struct Directory<D> { + directory: HashMap<String, CloneInvoker<TripleInvoker>>, + discover: D, + new_invoker: NewInvoker, } @@ -143,6 +139,8 @@ where impl<N> NewDirectory<N> { + const MAX_DIRECTORY_BUFFER_SIZE: usize = 16; + pub fn new(inner: N) -> Self { NewDirectory { inner @@ -150,6 +148,8 @@ impl<N> NewDirectory<N> { } } + + impl<N, T> NewService<T> for NewDirectory<N> where T: Param<RpcInvocation>, @@ -157,6 +157,8 @@ where N: Registry + Clone + Send + Sync + 'static, { type Service = BufferedDirectory; + + fn new_service(&self, target: T) -> Self::Service { @@ -164,26 +166,27 @@ where let registry = self.inner.clone(); - let (tx, rx) = channel(1024); + let (tx, rx) = channel(Self::MAX_DIRECTORY_BUFFER_SIZE); tokio::spawn(async move { - let receiver = registry.subscribe(service_name).await; + debug!("discover start!"); match receiver { Err(e) => { // error!("discover stream error: {}", e); - + debug!("discover stream error"); }, Ok(mut receiver) => { loop { let change = receiver.recv().await; + debug!("receive change: {:?}", change); match change { None => { - // debug!("discover stream closed."); + debug!("discover stream closed."); break; }, Some(change) => { - let _ = tx.send(change); + let _ = tx.send(change).await; } } } @@ -192,71 +195,20 @@ where }); - Buffer::new(Directory::new(ReceiverStream::new(rx)), 1024) + Buffer::new(Directory::new(ReceiverStream::new(rx)), Self::MAX_DIRECTORY_BUFFER_SIZE) } } -impl<D> Directory<D> -where - // Discover - D: Discover + Unpin + Send + 'static, - // the key may be dubbo url - D::Key: Hash + Eq + Clone + Send, - // invoker new service - D::Service: NewService<()> + Clone + Send + Sync, -{ +impl<D> Directory<D> { pub fn new(discover: D) -> Self { - let mut discover = Box::pin(discover); - - let (tx, rx) = watch::channel(Vec::new()); - let close = Arc::new(Notify::new()); - let close_clone = close.clone(); - - tokio::spawn(async move { - let mut cache: HashMap<D::Key, D::Service> = HashMap::new(); - - loop { - let changed = select! { - _ = close_clone.notified() => { - // info!("discover stream closed.") - return; - }, - changed = poll_fn(|cx| discover.as_mut().poll_discover(cx)) => { - changed - } - }; - let Some(changed) = changed else { - // debug!("discover stream closed."); - break; - }; - - match changed { - Err(e) => { - // error!("discover stream error: {}", e); - continue; - }, - Ok(changed) => match changed { - Change::Insert(k, v) => { - cache.insert(k, v); - }, - Change::Remove(k) => { - cache.remove(&k); - } - } - } - - let vec: Vec<D::Service> = cache.values().map(|v|v.clone()).collect(); - let _ = tx.send(vec); - } - - }); Directory { - rx, - close + directory: Default::default(), + discover, + new_invoker: NewInvoker, } } } @@ -265,34 +217,40 @@ where impl<D> Service<()> for Directory<D> where // Discover - D: Discover + Unpin + Send, - // the key may be dubbo url - D::Key: Hash + Eq + Clone + Send, - // invoker new service - D::Service: NewService<()> + Clone + Send + Sync, + D: Discover<Key = String> + Unpin + Send, + D::Error: Into<StdError> { - type Response = watch::Receiver<Vec<D::Service>>; + type Response = Vec<CloneInvoker<TripleInvoker>>; type Error = StdError; type Future = future::Ready<Result<Self::Response, Self::Error>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) + + loop { + let pin_discover = Pin::new(&mut self.discover); + let change = ready!(pin_discover.poll_discover(cx)).transpose().map_err(|e| e.into())?; + match change { + Some(Change::Remove(key)) => { + debug!("remove key: {}", key); + self.directory.remove(&key); + }, + Some(Change::Insert(key, _)) => { + debug!("insert key: {}", key); + let invoker = self.new_invoker.new_service(key.clone()); + self.directory.insert(key, invoker); + }, + None => { + debug!("stream closed"); + return Poll::Ready(Ok(())); + } + } + } } fn call(&mut self, _: ()) -> Self::Future { - future::ok(self.rx.clone()) - } -} - -impl<D> Drop for Directory<D> -where - D: Discover, -{ - fn drop(&mut self) { - if Arc::strong_count(&self.close) == 1 { - self.close.notify_one(); - } + let vec = self.directory.values().map(|val|val.clone()).collect::<Vec<CloneInvoker<TripleInvoker>>>(); + future::ok(vec) } } \ No newline at end of file diff --git a/dubbo/src/cluster/clone_body.rs b/dubbo/src/invoker/clone_body.rs similarity index 91% rename from dubbo/src/cluster/clone_body.rs rename to dubbo/src/invoker/clone_body.rs index 1d6b65a..5ce2e1f 100644 --- a/dubbo/src/cluster/clone_body.rs +++ b/dubbo/src/invoker/clone_body.rs @@ -14,32 +14,31 @@ use pin_project::pin_project; use thiserror::Error; use crate::StdError; - #[derive(Error, Debug)] #[error("buffered body reach max capacity.")] pub struct ReachMaxCapacityError; -pub struct BufferedBody<B> { - shared: Arc<Mutex<Option<OwnedBufferedBody<B>>>>, - owned: Option<OwnedBufferedBody<B>>, +pub struct BufferedBody { + shared: Arc<Mutex<Option<OwnedBufferedBody>>>, + owned: Option<OwnedBufferedBody>, replay_body: bool, replay_trailers: bool, is_empty: bool, size_hint: http_body::SizeHint, } -pub struct OwnedBufferedBody<B> { - body: B, +pub struct OwnedBufferedBody { + body: hyper::Body, trailers: Option<HeaderMap>, buf: InnerBuffer, } -impl<B: http_body::Body> BufferedBody<B> { +impl BufferedBody { - pub fn new(body: B, buf_size: usize) -> Self { + pub fn new(body: hyper::Body, buf_size: usize) -> Self { let size_hint = body.size_hint(); let is_empty = body.is_end_stream(); BufferedBody { @@ -61,7 +60,7 @@ impl<B: http_body::Body> BufferedBody<B> { } -impl<B> Clone for BufferedBody<B> { +impl Clone for BufferedBody { fn clone(&self) -> Self { Self { @@ -75,7 +74,7 @@ impl<B> Clone for BufferedBody<B> { } } -impl<B> Drop for BufferedBody<B> { +impl Drop for BufferedBody { fn drop(&mut self) { if let Some(owned) = self.owned.take() { let lock = self.shared.lock(); @@ -86,11 +85,8 @@ impl<B> Drop for BufferedBody<B> { } } -impl<B> Body for BufferedBody<B> -where - B: http_body::Body + Unpin, - B::Error: Into<StdError>, -{ +impl Body for BufferedBody { + type Data = BytesData; type Error = StdError; @@ -328,24 +324,16 @@ impl Buf for BytesData { } #[pin_project] -pub struct CloneBody<B>(#[pin] BufferedBody<B>); - -impl<B> CloneBody<B> -where - B: http_body::Body + Unpin, - B::Error: Into<StdError>, -{ - pub fn new(inner_body: B) -> Self { +pub struct CloneBody(#[pin] BufferedBody); + +impl CloneBody { + pub fn new(inner_body: hyper::Body) -> Self { let inner_body = BufferedBody::new(inner_body, 1024 * 64); CloneBody(inner_body) } } -impl<B> Body for CloneBody<B> -where - B: http_body::Body + Unpin, - B::Error: Into<StdError>, -{ +impl Body for CloneBody{ type Data = BytesData; @@ -371,11 +359,7 @@ where } -impl<B> Clone for CloneBody<B> -where - B: http_body::Body + Unpin, - B::Error: Into<StdError>, -{ +impl Clone for CloneBody { fn clone(&self) -> Self { Self(self.0.clone()) } diff --git a/dubbo/src/cluster/clone_invoker.rs b/dubbo/src/invoker/clone_invoker.rs similarity index 90% rename from dubbo/src/cluster/clone_invoker.rs rename to dubbo/src/invoker/clone_invoker.rs index 20e1811..fe621b8 100644 --- a/dubbo/src/cluster/clone_invoker.rs +++ b/dubbo/src/invoker/clone_invoker.rs @@ -12,6 +12,8 @@ use tower_service::Service; use crate::StdError; +use super::clone_body::CloneBody; + enum Inner<S> { Invalid, Ready(S), @@ -149,43 +151,42 @@ impl<S> Drop for ReadyService<S> { } } -pub struct CloneInvoker<Inv, Req> +pub struct CloneInvoker<Inv> where - Inv: Service<Req> + Send + 'static, + Inv: Service<http::Request<CloneBody>> + Send + 'static, Inv::Error: Into<StdError> + Send + Sync + 'static, Inv::Future: Send, - Req: Send { - inner: Buffer<ReadyService<Inv>, Req>, + inner: Buffer<ReadyService<Inv>, http::Request<CloneBody>>, rx: Receiver<ObserveState>, poll: ReusableBoxFuture<'static, ObserveState>, polling: bool, } -impl<Inv, Req> CloneInvoker<Inv, Req> +impl<Inv> CloneInvoker<Inv> where - Inv: Service<Req> + Send + 'static, + Inv: Service<http::Request<CloneBody>> + Send + 'static, Inv::Error: Into<StdError> + Send + Sync + 'static, Inv::Future: Send, - Req: Send + 'static { + const MAX_INVOKER_BUFFER_SIZE: usize = 16; + pub fn new(invoker: Inv) -> Self { let (ready_service, rx) = ReadyService::new(invoker); - let buffer: Buffer<ReadyService<Inv>, Req> = Buffer::new(ready_service, 1024); + let buffer: Buffer<ReadyService<Inv>, http::Request<CloneBody>> = Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE); Self { inner: buffer, rx, polling: false, poll: ReusableBoxFuture::new(futures::future::pending()) } } } -impl<Inv, Req> Service<Req> for CloneInvoker<Inv, Req> +impl<Inv> Service<http::Request<CloneBody>> for CloneInvoker<Inv> where - Inv: Service<Req> + Send + 'static, + Inv: Service<http::Request<CloneBody>> + Send + 'static, Inv::Error: Into<StdError> + Send + Sync + 'static, Inv::Future: Send, - Req: Send + 'static { type Response = Inv::Response; @@ -229,18 +230,17 @@ where } } - fn call(&mut self, req: Req) -> Self::Future { + fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future { Box::pin(self.inner.call(req)) } } -impl<Inv, Req> Clone for CloneInvoker<Inv, Req> +impl<Inv> Clone for CloneInvoker<Inv> where - Inv: Service<Req> + Send + 'static, + Inv: Service<http::Request<CloneBody>> + Send + 'static, Inv::Error: Into<StdError> + Send + Sync + 'static, Inv::Future: Send, - Req: Send { fn clone(&self) -> Self { Self { inner: self.inner.clone(), rx: self.rx.clone(), polling: false, poll: ReusableBoxFuture::new(futures::future::pending())} diff --git a/dubbo/src/invoker/mod.rs b/dubbo/src/invoker/mod.rs index 81bcb68..a8179ee 100644 --- a/dubbo/src/invoker/mod.rs +++ b/dubbo/src/invoker/mod.rs @@ -1,75 +1,21 @@ use dubbo_base::Url; -use tower_service::Service; -use crate::{codegen::TripleInvoker, param::Param, svc::NewService}; +use crate::{codegen::TripleInvoker, svc::NewService, invoker::clone_invoker::CloneInvoker}; -#[derive(Clone)] -pub struct NewInvoker { - url: Url -} +pub mod clone_body; +pub mod clone_invoker; -pub enum InvokerComponent { - TripleInvoker(TripleInvoker) -} +pub struct NewInvoker; -impl NewInvoker { - pub fn new(url: Url) -> Self { - Self { - url - } - } -} - -impl From<String> for NewInvoker { - fn from(url: String) -> Self { - Self { - url: Url::from_url(&url).unwrap() - } - } -} - -impl Param<Url> for NewInvoker { - fn param(&self) -> Url { - self.url.clone() - } -} - -impl NewService<()> for NewInvoker { - type Service = InvokerComponent; - fn new_service(&self, _: ()) -> Self::Service { - // todo create another invoker - InvokerComponent::TripleInvoker(TripleInvoker::new(self.url.clone())) - } -} - - -impl<B> Service<http::Request<B>> for InvokerComponent -where - B: http_body::Body + Unpin + Send + 'static, - B::Error: Into<crate::Error>, - B::Data: Send + Unpin, -{ - type Response = http::Response<crate::BoxBody>; - type Error = crate::Error; +impl NewService<String> for NewInvoker { + type Service = CloneInvoker<TripleInvoker>; - type Future = crate::BoxFuture<Self::Response, Self::Error>; + fn new_service(&self, url: String) -> Self::Service { + // todo create another invoker by url protocol - fn call(&mut self, req: http::Request<B>) -> Self::Future { - match self { - InvokerComponent::TripleInvoker(invoker) => invoker.call(req), - } + let url = Url::from_url(&url).unwrap(); + CloneInvoker::new(TripleInvoker::new(url)) } - - fn poll_ready( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Result<(), Self::Error>> { - match self { - InvokerComponent::TripleInvoker(invoker) => <TripleInvoker as Service<http::Request<B>>>::poll_ready(invoker, cx), - } - } -} - -// InvokerComponent::TripleInvoker(invoker) => <TripleInvoker as Service<http::Request<B>>>::poll_ready(invoker, cx), \ No newline at end of file +} \ No newline at end of file diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs index 822794c..334350e 100644 --- a/dubbo/src/loadbalancer/mod.rs +++ b/dubbo/src/loadbalancer/mod.rs @@ -1,8 +1,12 @@ use futures_core::future::BoxFuture; -use tower::{discover::ServiceList, ServiceExt}; +use tower::ServiceExt; +use tower::discover::ServiceList; use tower_service::Service; -use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param}; +use crate::invoker::clone_body::CloneBody; +use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker}; + +use crate::protocol::triple::triple_invoker::TripleInvoker; pub struct NewLoadBalancer<N> { inner: N, @@ -40,28 +44,21 @@ where let svc = self.inner.new_service(target); LoadBalancer { - inner: svc + inner: svc, } } } -impl<N, Req, Nsv> Service<Req> for LoadBalancer<N> +impl<N> Service<http::Request<CloneBody>> for LoadBalancer<N> where - Req: Send + 'static, // Routes service - N: Service<(), Response = Vec<Nsv>> + Clone, + N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Clone, N::Error: Into<StdError> + Send, N::Future: Send + 'static, - // new invoker - Nsv: NewService<()> + Send, - Nsv::Service: Service<Req> + Send, - // invoker - <Nsv::Service as Service<Req>>::Error: Into<StdError> + Send, - <Nsv::Service as Service<Req>>::Future: Send + 'static, { - type Response = <Nsv::Service as Service<Req>>::Response; + type Response = <CloneInvoker<TripleInvoker> as Service<http::Request<CloneBody>>>::Response; type Error = StdError; @@ -69,30 +66,28 @@ where fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> { self.inner.poll_ready(cx).map_err(Into::into) - + } - fn call(&mut self, req: Req) -> Self::Future { + fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future { let routes = self.inner.call(()); let fut = async move { let routes = routes.await; - let routes: Vec<Nsv> = match routes { + let routes: Vec<CloneInvoker<TripleInvoker>> = match routes { Err(e) => return Err(Into::<StdError>::into(e)), Ok(routes) => routes }; - - let service_list: Vec<_> = routes.iter().map(|inv| { - let invoker = inv.new_service(()); + + let service_list: Vec<_> = routes.into_iter().map(|invoker| { tower::load::Constant::new(invoker, 1) - }).collect(); let service_list = ServiceList::new(service_list); let p2c = tower::balance::p2c::Balance::new(service_list); - + p2c.oneshot(req).await }; diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 685c434..71eae90 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -16,15 +16,15 @@ */ use dubbo_base::Url; +use http::{Uri, HeaderValue}; use std::{ fmt::{Debug, Formatter}, str::FromStr, }; use tower_service::Service; -use crate::triple::transport::{connection::Connection, self}; +use crate::{triple::transport::{connection::Connection, self}, invoker::clone_body::CloneBody}; -#[derive(Clone)] pub struct TripleInvoker { url: Url, conn: Connection, @@ -35,7 +35,7 @@ impl TripleInvoker { let uri = http::Uri::from_str(&url.to_url()).unwrap(); Self { url, - conn: Connection::new().with_host(uri), + conn: Connection::new().with_host(uri).build(), } } } @@ -46,27 +46,104 @@ impl Debug for TripleInvoker { } } -impl<B> Service<http::Request<B>> for TripleInvoker -where - B: http_body::Body + Unpin + Send + 'static, - B::Error: Into<crate::Error>, - B::Data: Send + Unpin, -{ +impl TripleInvoker { + pub fn map_request( + &self, + req: http::Request<CloneBody>, + ) -> http::Request<CloneBody> { + + let (parts, body) = req.into_parts(); + + let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap(); + + let authority = self.url.clone().get_ip_port(); + + let uri = Uri::builder().scheme("http").authority(authority).path_and_query(path_and_query).build().unwrap(); + + let mut req = hyper::Request::builder() + .version(http::Version::HTTP_2) + .uri(uri.clone()) + .method("POST") + .body(body) + .unwrap(); + + // *req.version_mut() = http::Version::HTTP_2; + req.headers_mut() + .insert("method", HeaderValue::from_static("POST")); + req.headers_mut().insert( + "scheme", + HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(), + ); + req.headers_mut() + .insert("path", HeaderValue::from_str(uri.path()).unwrap()); + req.headers_mut().insert( + "authority", + HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(), + ); + req.headers_mut().insert( + "content-type", + HeaderValue::from_static("application/grpc+proto"), + ); + req.headers_mut() + .insert("user-agent", HeaderValue::from_static("dubbo-rust/0.1.0")); + req.headers_mut() + .insert("te", HeaderValue::from_static("trailers")); + req.headers_mut().insert( + "tri-service-version", + HeaderValue::from_static("dubbo-rust/0.1.0"), + ); + req.headers_mut() + .insert("tri-service-group", HeaderValue::from_static("cluster")); + req.headers_mut().insert( + "tri-unit-info", + HeaderValue::from_static("dubbo-rust/0.1.0"), + ); + // if let Some(_encoding) = self.send_compression_encoding { + + // } + + req.headers_mut() + .insert("grpc-encoding", http::HeaderValue::from_static("gzip")); + + req.headers_mut().insert( + "grpc-accept-encoding", + http::HeaderValue::from_static("gzip"), + ); + + // // const ( + // // TripleContentType = "application/grpc+proto" + // // TripleUserAgent = "grpc-go/1.35.0-dev" + // // TripleServiceVersion = "tri-service-version" + // // TripleAttachement = "tri-attachment" + // // TripleServiceGroup = "tri-service-group" + // // TripleRequestID = "tri-req-id" + // // TripleTraceID = "tri-trace-traceid" + // // TripleTraceRPCID = "tri-trace-rpcid" + // // TripleTraceProtoBin = "tri-trace-proto-bin" + // // TripleUnitInfo = "tri-unit-info" + // // ) + req + } +} + +impl Service<http::Request<CloneBody>> for TripleInvoker { type Response = http::Response<crate::BoxBody>; type Error = crate::Error; type Future = crate::BoxFuture<Self::Response, Self::Error>; - fn call(&mut self, req: http::Request<B>) -> Self::Future { - self.conn.call(req) - } - fn poll_ready( &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Result<(), Self::Error>> { - <transport::connection::Connection as Service<http::Request<B>>>::poll_ready(&mut self.conn, cx) + <transport::connection::Connection as Service<http::Request<CloneBody>>>::poll_ready(&mut self.conn, cx) + } + + fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future { + let req = self.map_request(req); + + self.conn.call(req) } } diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs index 69300f2..9b6dca5 100644 --- a/dubbo/src/registry/n_registry.rs +++ b/dubbo/src/registry/n_registry.rs @@ -6,9 +6,9 @@ use tokio::sync::mpsc::{Receiver, channel}; use tower::discover::Change; -use crate::{StdError, invoker::NewInvoker}; +use crate::StdError; -type DiscoverStream = Receiver<Result<Change<String, NewInvoker>, StdError>>; +type DiscoverStream = Receiver<Result<Change<String, ()>, StdError>>; #[async_trait] pub trait Registry { @@ -51,19 +51,19 @@ impl ArcRegistry { impl Registry for ArcRegistry { async fn register(&self, url: Url) -> Result<(), StdError> { - self.register(url).await + self.inner.register(url).await } async fn unregister(&self, url: Url) -> Result<(), StdError> { - self.unregister(url).await + self.inner.unregister(url).await } async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError> { - self.subscribe(service_name).await + self.inner.subscribe(service_name).await } async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { - self.unsubscribe(url).await + self.inner.unsubscribe(url).await } } @@ -81,7 +81,11 @@ impl Registry for RegistryComponent { } async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError> { - todo!() + match self { + RegistryComponent::NacosRegistry => todo!(), + RegistryComponent::ZookeeperRegistry => todo!(), + RegistryComponent::StaticRegistry(registry) => registry.subscribe(service_name).await, + } } async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { @@ -113,8 +117,7 @@ impl Registry for StaticRegistry { async fn subscribe(&self, service_name: String) -> Result<DiscoverStream, StdError> { let (tx, rx) = channel(self.urls.len()); for url in self.urls.iter() { - let invoker = NewInvoker::new(url.clone()); - let change = Ok(Change::Insert(service_name.clone(), invoker)); + let change = Ok(Change::Insert(url.to_url(), ())); tx.send(change).await?; } diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs index dfbe73e..cff3cce 100644 --- a/dubbo/src/route/mod.rs +++ b/dubbo/src/route/mod.rs @@ -1,38 +1,35 @@ -use std::{sync::{Arc, Mutex}, collections::HashMap}; +use std::pin::Pin; +use dubbo_logger::tracing::debug; use futures_core::{Future, ready}; -use futures_util::future::Ready; -use pin_project::pin_project; -use tokio::{sync::watch, pin}; +use futures_util::{future::Ready, FutureExt, TryFutureExt}; use tower::{util::FutureService, buffer::Buffer}; use tower_service::Service; -use crate::{StdError, codegen::RpcInvocation, svc::NewService, param::Param, invocation::Invocation}; +use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker}; pub struct NewRoutes<N> { inner: N, } -pub struct NewRoutesCache<N> -where - N: NewService<RpcInvocation> -{ - inner: N, - cache: Arc<Mutex<HashMap<String, N::Service>>>, -} -#[pin_project] -pub struct NewRoutesFuture<N, T> { - #[pin] - inner: N, +pub struct NewRoutesFuture<S, T> { + inner: RoutesFutureInnerState<S>, target: T, } + + +pub enum RoutesFutureInnerState<S> { + Service(S), + Future(Pin<Box<dyn Future<Output = Result<Vec<CloneInvoker<TripleInvoker>>, StdError>> + Send + 'static>>), + Ready(Vec<CloneInvoker<TripleInvoker>>), +} + #[derive(Clone)] -pub struct Routes<Nsv, T> { +pub struct Routes<T> { target: T, - new_invokers: Vec<Nsv>, - invokers_receiver: watch::Receiver<Vec<Nsv>>, + invokers: Vec<CloneInvoker<TripleInvoker>> } impl<N> NewRoutes<N> { @@ -43,154 +40,101 @@ impl<N> NewRoutes<N> { } } +impl<N> NewRoutes<N> { + const MAX_ROUTE_BUFFER_SIZE: usize = 16; -impl<N, T, Nsv> NewService<T> for NewRoutes<N> -where - T: Param<RpcInvocation> + Clone + Send + 'static, - // NewDirectory - N: NewService<T>, - // Directory - N::Service: Service<(), Response = watch::Receiver<Vec<Nsv>>> + Unpin + Send + 'static, - <N::Service as Service<()>>::Error: Into<StdError>, - // new invoker service - Nsv: NewService<()> + Clone + Send + Sync + 'static, -{ - - type Service = Buffer<FutureService<NewRoutesFuture<<N as NewService<T>>::Service, T>, Routes<Nsv, T>>, ()>; - - fn new_service(&self, target: T) -> Self::Service { - let inner = self.inner.new_service(target.clone()); - - Buffer::new(FutureService::new(NewRoutesFuture { - inner, - target, - }), 1024) - } -} -impl<N, Nsv> NewRoutesCache<N> -where - N: NewService<RpcInvocation>, - <N as NewService<RpcInvocation>>::Service: Service<(), Response = watch::Receiver<Vec<Nsv>>> + Unpin + Send + 'static, - <N::Service as Service<()>>::Error: Into<StdError>, - Nsv: NewService<()> + Clone + Send + Sync + 'static, - -{ - pub fn layer() -> impl tower_layer::Layer<N, Service = NewRoutesCache<NewRoutes<N>>> { + pub fn layer() -> impl tower_layer::Layer<N, Service = Self> { tower_layer::layer_fn(|inner: N| { - NewRoutesCache::new(NewRoutes::new(inner)) + NewRoutes::new(inner) }) } - - } -impl<N> NewRoutesCache<N> +impl<N, T> NewService<T> for NewRoutes<N> where - N: NewService<RpcInvocation> -{ - pub fn new(inner: N) -> Self { - Self { - inner, - cache: Default::default(), - } - } -} + T: Param<RpcInvocation> + Clone + Send + Unpin + 'static, + // NewDirectory + N: NewService<T>, + // Directory + N::Service: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin + Send + 'static, + <N::Service as Service<()>>::Error: Into<StdError>, + <N::Service as Service<()>>::Future: Send + 'static, +{ -impl<N, T> NewService<T> for NewRoutesCache<N> -where - T: Param<RpcInvocation>, - N: NewService<RpcInvocation>, - N::Service: Clone, -{ - type Service = N::Service; + type Service = Buffer<FutureService<NewRoutesFuture<<N as NewService<T>>::Service, T>, Routes<T>>, ()>; fn new_service(&self, target: T) -> Self::Service { - let rpc_inv = target.param(); - let service_name = rpc_inv.get_target_service_unique_name(); - - let mut cache = self.cache.lock().expect("RoutesCache get lock failed"); - - let service = cache.get(&service_name); - match service { - Some(service) => service.clone(), - None => { - let service = self.inner.new_service(rpc_inv); - cache.insert(service_name, service.clone()); - service - } - } + let inner = self.inner.new_service(target.clone()); + + Buffer::new(FutureService::new(NewRoutesFuture { + inner: RoutesFutureInnerState::Service(inner), + target, + }), Self::MAX_ROUTE_BUFFER_SIZE) } } -impl<N, T, Nsv> Future for NewRoutesFuture<N, T> +impl<N, T> Future for NewRoutesFuture<N, T> where - T: Param<RpcInvocation> + Clone, + T: Param<RpcInvocation> + Clone + Unpin, // Directory - N: Service<(), Response = watch::Receiver<Vec<Nsv>>> + Unpin, + N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin, N::Error: Into<StdError>, - // new invoker service - Nsv: NewService<()> + Clone, + N::Future: Send + 'static, { - type Output = Result<Routes<Nsv, T>, StdError>; + type Output = Result<Routes<T>, StdError>; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> { let this = self.get_mut(); - let target = this.target.clone(); - - let _ = ready!(this.inner.poll_ready(cx)).map_err(Into::into)?; - - - let call = this.inner.call(()); - pin!(call); - - let mut invokers_receiver = ready!(call.poll(cx).map_err(Into::into))?; - let new_invokers = { - let wait_for = invokers_receiver.wait_for(|invs|!invs.is_empty()); - pin!(wait_for); - - let changed = ready!(wait_for.poll(cx))?; - - changed.clone() - }; - - - std::task::Poll::Ready(Ok(Routes { - invokers_receiver, - new_invokers, - target, - })) + loop { + match this.inner { + RoutesFutureInnerState::Service(ref mut service) => { + debug!("RoutesFutureInnerState::Service"); + let _ = ready!(service.poll_ready(cx)).map_err(Into::into)?; + let fut = service.call(()).map_err(|e|e.into()).boxed(); + this.inner = RoutesFutureInnerState::Future(fut); + }, + RoutesFutureInnerState::Future(ref mut futures) => { + debug!("RoutesFutureInnerState::Future"); + let invokers = ready!(futures.as_mut().poll(cx))?; + this.inner = RoutesFutureInnerState::Ready(invokers); + }, + RoutesFutureInnerState::Ready(ref invokers) => { + debug!("RoutesFutureInnerState::Ready"); + let target = this.target.clone(); + return std::task::Poll::Ready(Ok(Routes { + invokers: invokers.clone(), + target, + })); + }, + } + } + } } -impl<Nsv,T> Service<()> for Routes<Nsv, T> +impl<T> Service<()> for Routes<T> where T: Param<RpcInvocation> + Clone, - // new invoker service - Nsv: NewService<()> + Clone, { - type Response = Vec<Nsv>; + type Response = Vec<CloneInvoker<TripleInvoker>>; type Error = StdError; type Future = Ready<Result<Self::Response, Self::Error>>; fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> { - let has_change = self.invokers_receiver.has_changed()?; - if has_change { - self.new_invokers = self.invokers_receiver.borrow_and_update().clone(); - } std::task::Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { // some router operator // if new_invokers changed, send new invokers to routes_rx after router operator - futures_util::future::ok(self.new_invokers.clone()) + futures_util::future::ok(self.invokers.clone()) } } \ No newline at end of file diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 89d7a00..ed08021 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use crate::{ - utils::boxed_clone::BoxCloneService, registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, route::{NewRoutes, NewRoutesCache}, loadbalancer::NewLoadBalancer, cluster::{NewCluster, Cluster}, directory::NewCachedDirectory, svc::{ArcNewService, NewService, BoxedService}, StdError, codegen::RpcInvocation, BoxBody, + utils::boxed_clone::BoxCloneService, registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, route::NewRoutes, loadbalancer::NewLoadBalancer, cluster::{NewCluster, Cluster}, directory::NewCachedDirectory, svc::{ArcNewService, NewService, BoxedService}, StdError, codegen::RpcInvocation, BoxBody, }; use aws_smithy_http::body::SdkBody; @@ -30,7 +30,7 @@ pub type ClientBoxService = BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, crate::Error>; -pub type ServiceMK = Arc<NewCluster<NewLoadBalancer<NewRoutesCache<NewRoutes<NewCachedDirectory<ArcRegistry>>>>>>; +pub type ServiceMK = Arc<NewCluster<NewLoadBalancer<NewRoutes<NewCachedDirectory<ArcRegistry>>>>>; #[derive(Default)] pub struct ClientBuilder { @@ -102,7 +102,7 @@ impl ClientBuilder { let mk_service = ServiceBuilder::new() .layer(NewCluster::layer()) .layer(NewLoadBalancer::layer()) - .layer(NewRoutesCache::layer()) + .layer(NewRoutes::layer()) .layer(NewCachedDirectory::layer()) .service(registry); diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 091e020..9bcf144 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -153,6 +153,7 @@ impl TripleClient { let request = http::Request::builder() + .header("path", path.to_string()) .body(body).unwrap(); @@ -215,6 +216,7 @@ impl TripleClient { let request = http::Request::builder() + .header("path", path.to_string()) .body(body).unwrap(); @@ -259,6 +261,7 @@ impl TripleClient { let request = http::Request::builder() + .header("path", path.to_string()) .body(body).unwrap(); @@ -321,6 +324,7 @@ impl TripleClient { let request = http::Request::builder() + .header("path", path.to_string()) .body(body).unwrap(); diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs index 1b97875..3bbaa17 100644 --- a/dubbo/src/triple/transport/connection.rs +++ b/dubbo/src/triple/transport/connection.rs @@ -15,19 +15,18 @@ * limitations under the License. */ -use std::task::Poll; - -use dubbo_logger::tracing::debug; use hyper::client::{conn::Builder, service::Connect}; use tower_service::Service; -use crate::{boxed, triple::transport::connector::get_connector}; +use crate::{boxed, triple::transport::connector::get_connector, StdError, invoker::clone_body::CloneBody}; + +type HyperConnect = Connect<crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, StdError>, CloneBody, http::Uri>; -#[derive(Debug, Clone)] pub struct Connection { host: hyper::Uri, connector: &'static str, builder: Builder, + connect: Option<HyperConnect>, } impl Default for Connection { @@ -42,6 +41,7 @@ impl Connection { host: hyper::Uri::default(), connector: "http", builder: Builder::new(), + connect: None, } } @@ -59,14 +59,19 @@ impl Connection { self.builder = builder; self } + + pub fn build(mut self) -> Self { + let builder = self.builder.clone().http2_only(true).to_owned(); + let hyper_connect: HyperConnect = Connect::new(get_connector(self.connector), builder); + self.connect = Some(hyper_connect); + self + + } } -impl<ReqBody> Service<http::Request<ReqBody>> for Connection -where - ReqBody: http_body::Body + Unpin + Send + 'static, - ReqBody::Data: Send + Unpin, - ReqBody::Error: Into<crate::Error>, -{ +impl Service<http::Request<CloneBody>> for Connection { + + type Response = http::Response<crate::BoxBody>; type Error = crate::Error; @@ -75,25 +80,36 @@ where fn poll_ready( &mut self, - _cx: &mut std::task::Context<'_>, + cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) + match self.connect { + None => { + panic!("connection must be built before use") + }, + Some(ref mut connect) => { + connect.poll_ready(cx).map_err(|e|e.into()) + } + } } - fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future { - let builder = self.builder.clone().http2_only(true).to_owned(); - let mut connector = Connect::new(get_connector(self.connector), builder); - let uri = self.host.clone(); - let fut = async move { - debug!("send base call to {}", uri); - let mut con = connector.call(uri).await.unwrap(); - - con.call(req) - .await - .map_err(|err| err.into()) - .map(|res| res.map(boxed)) - }; - - Box::pin(fut) + fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future { + + match self.connect { + None => { + panic!("connection must be built before use") + }, + Some(ref mut connect) => { + let uri = self.host.clone(); + let call_fut = connect.call(uri); + let fut = async move { + let mut con = call_fut.await.unwrap(); + con.call(req).await + .map_err(|err| err.into()) + .map(|res| res.map(boxed)) + }; + + return Box::pin(fut) + } + } } } diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs index e756d5c..07c58fe 100644 --- a/examples/echo/src/generated/grpc.examples.echo.rs +++ b/examples/echo/src/generated/grpc.examples.echo.rs @@ -17,7 +17,7 @@ pub mod echo_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use dubbo::codegen::*; /// Echo is the echo service. - #[derive(Debug, Clone, Default)] + #[derive(Clone)] pub struct EchoClient { inner: TripleClient, } diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs index 1d59cdf..afa1b20 100644 --- a/examples/greeter/src/greeter/client.rs +++ b/examples/greeter/src/greeter/client.rs @@ -21,21 +21,17 @@ pub mod protos { } use std::env; - + use dubbo::codegen::*; -use dubbo_base::Url; use futures_util::StreamExt; use protos::{greeter_client::GreeterClient, GreeterRequest}; -use registry_nacos::NacosRegistry; -use registry_zookeeper::ZookeeperRegistry; #[tokio::main] async fn main() { dubbo_logger::init(); - - let mut builder = ClientBuilder::new(); - builder.with_host("http://127.0.0.1:8888"); + + let builder = ClientBuilder::new().with_host("http://127.0.0.1:8888"); let mut cli = GreeterClient::new(builder); @@ -47,7 +43,7 @@ async fn main() { .await; let resp = match resp { Ok(resp) => resp, - Err(err) => return println!("{:?}", err), + Err(err) => return println!("response error: {:?}", err), }; let (_parts, body) = resp.into_parts(); println!("Response: {:?}", body); diff --git a/examples/greeter/src/greeter/server.rs b/examples/greeter/src/greeter/server.rs index 94e4e53..c3bc4c5 100644 --- a/examples/greeter/src/greeter/server.rs +++ b/examples/greeter/src/greeter/server.rs @@ -22,7 +22,7 @@ use futures_util::{Stream, StreamExt}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use dubbo::{codegen::*, Dubbo}; +use dubbo::{codegen::*, Dubbo, registry::memory_registry::MemoryRegistry}; use dubbo_config::RootConfig; use dubbo_logger::{ tracing::{info, span}, @@ -50,7 +50,7 @@ async fn main() { register_server(GreeterServerImpl { name: "greeter".to_string(), }); - let zkr = ZookeeperRegistry::default(); + // let zkr: ZookeeperRegistry = ZookeeperRegistry::default(); let r = RootConfig::new(); let r = match r.load() { Ok(config) => config, @@ -58,7 +58,9 @@ async fn main() { }; let mut f = Dubbo::new() .with_config(r) - .add_registry("zookeeper", Box::new(zkr)); + .add_registry("memory_registry", Box::new(MemoryRegistry::new())); + + f.start().await; }