This is an automated email from the ASF dual-hosted git repository. yangyang pushed a commit to branch refact/cluster in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/refact/cluster by this push: new 9b93de5 Mod: format code and fix some warnings (#167) 9b93de5 is described below commit 9b93de56f296da3c1c2a55915c700e36b86ac3bb Author: Yang Yang <962032...@qq.com> AuthorDate: Sun Dec 10 13:53:45 2023 +0800 Mod: format code and fix some warnings (#167) * style(dubbo): cargo fmt --all * style(dubbo): cargo fix --lib -p dubbo --allow-dirty * chore(github): update branch in pr --- .github/workflows/github-actions.yml | 4 +- common/base/src/lib.rs | 2 +- dubbo/src/cluster/failover.rs | 25 ++-- dubbo/src/cluster/mod.rs | 38 +++--- dubbo/src/codegen.rs | 3 +- dubbo/src/directory/mod.rs | 173 +++++++++++++--------------- dubbo/src/invoker/clone_body.rs | 50 +++----- dubbo/src/invoker/clone_invoker.rs | 124 ++++++++++---------- dubbo/src/invoker/mod.rs | 6 +- dubbo/src/lib.rs | 12 +- dubbo/src/loadbalancer/mod.rs | 62 +++++----- dubbo/src/param.rs | 5 +- dubbo/src/protocol/mod.rs | 4 +- dubbo/src/protocol/triple/triple_invoker.rs | 36 +++--- dubbo/src/registry/mod.rs | 3 +- dubbo/src/registry/n_registry.rs | 32 ++--- dubbo/src/route/mod.rs | 108 +++++++++-------- dubbo/src/svc.rs | 28 ++--- dubbo/src/triple/client/builder.rs | 38 +++--- dubbo/src/triple/client/triple.rs | 28 ++--- dubbo/src/triple/transport/connection.rs | 31 ++--- 21 files changed, 386 insertions(+), 426 deletions(-) diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 19d8c4d..878a5e1 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -6,7 +6,9 @@ on: push: branches: ["*"] pull_request: - branches: ["*"] + branches: + - '*' + - 'refact/*' jobs: diff --git a/common/base/src/lib.rs b/common/base/src/lib.rs index cb3bc09..b97b342 100644 --- a/common/base/src/lib.rs +++ b/common/base/src/lib.rs @@ -23,4 +23,4 @@ pub mod node; pub mod url; pub use node::Node; -pub use url::Url; \ No newline at end of file +pub use url::Url; diff --git a/dubbo/src/cluster/failover.rs b/dubbo/src/cluster/failover.rs index 1a8149c..8a00c9f 100644 --- a/dubbo/src/cluster/failover.rs +++ b/dubbo/src/cluster/failover.rs @@ -2,18 +2,17 @@ use std::task::Poll; use futures_util::future; use http::Request; -use tower::{ServiceExt, retry::Retry, util::Oneshot}; +use tower::{retry::Retry, util::Oneshot, ServiceExt}; use tower_service::Service; - + use crate::StdError; pub struct Failover<N> { - inner: N // loadbalancer service + inner: N, // loadbalancer service } #[derive(Clone)] pub struct FailoverPolicy; - impl<N> Failover<N> { pub fn new(inner: N) -> Self { @@ -25,14 +24,13 @@ impl<B, Res, E> tower::retry::Policy<Request<B>, Res, E> for FailoverPolicy where B: http_body::Body + Clone, { - type Future = future::Ready<Self>; - fn retry(&self, req: &Request<B>, result: Result<&Res, &E>) -> Option<Self::Future> { + fn retry(&self, _req: &Request<B>, result: Result<&Res, &E>) -> Option<Self::Future> { //TODO some error handling or logging match result { Ok(_) => None, - Err(_) => Some(future::ready(self.clone())) + Err(_) => Some(future::ready(self.clone())), } } @@ -43,21 +41,18 @@ where *clone.headers_mut() = req.headers().clone(); *clone.version_mut() = req.version(); - Some(clone) } } - - -impl<N, B> Service<Request<B>> for Failover<N> +impl<N, B> Service<Request<B>> for Failover<N> where // B is CloneBody<B> B: http_body::Body + Clone, // loadbalancer service - N: Service<Request<B>> + Clone + 'static , + N: Service<Request<B>> + Clone + 'static, N::Error: Into<StdError>, - N::Future: Send + N::Future: Send, { type Response = N::Response; @@ -68,9 +63,9 @@ where fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { self.inner.poll_ready(cx) } - + fn call(&mut self, req: Request<B>) -> Self::Future { let retry = Retry::new(FailoverPolicy, self.inner.clone()); retry.oneshot(req) } -} \ No newline at end of file +} diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index 0b1654a..1a20c16 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -15,11 +15,12 @@ * limitations under the License. */ - use http::Request; use tower_service::Service; -use crate::{codegen::RpcInvocation, svc::NewService, param::Param, invoker::clone_body::CloneBody}; +use crate::{ + codegen::RpcInvocation, invoker::clone_body::CloneBody, param::Param, svc::NewService, +}; use self::failover::Failover; @@ -30,12 +31,10 @@ pub struct NewCluster<N> { } pub struct Cluster<S> { - inner: S // failover service + inner: S, // failover service } - impl<N> NewCluster<N> { - pub fn layer() -> impl tower_layer::Layer<N, Service = Self> { tower_layer::layer_fn(|inner: N| { NewCluster { @@ -43,45 +42,44 @@ impl<N> NewCluster<N> { } }) } +} -} - -impl<S, T> NewService<T> for NewCluster<S> +impl<S, T> NewService<T> for NewCluster<S> where - T: Param<RpcInvocation>, + T: Param<RpcInvocation>, // new loadbalancer service S: NewService<T>, -{ - +{ type Service = Cluster<Failover<S::Service>>; - - fn new_service(&self, target: T) -> Self::Service { + fn new_service(&self, target: T) -> Self::Service { Cluster { - inner: Failover::new(self.inner.new_service(target)) + inner: Failover::new(self.inner.new_service(target)), } } } - -impl<S> Service<Request<hyper::Body>> for Cluster<S> + +impl<S> Service<Request<hyper::Body>> for Cluster<S> where S: Service<Request<CloneBody>>, { - type Response = S::Response; type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> { + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), Self::Error>> { self.inner.poll_ready(cx) } - + fn call(&mut self, req: Request<hyper::Body>) -> Self::Future { let (parts, body) = req.into_parts(); let clone_body = CloneBody::new(body); - let req = Request::from_parts(parts, clone_body); + let req = Request::from_parts(parts, clone_body); self.inner.call(req) } } diff --git a/dubbo/src/codegen.rs b/dubbo/src/codegen.rs index 38f8f1d..452f560 100644 --- a/dubbo/src/codegen.rs +++ b/dubbo/src/codegen.rs @@ -44,8 +44,7 @@ pub use super::{ pub use crate::{ filter::{service::FilterService, Filter}, triple::{ - client::builder::ClientBuilder, - server::builder::ServerBuilder, + client::builder::ClientBuilder, server::builder::ServerBuilder, transport::connection::Connection, }, }; diff --git a/dubbo/src/directory/mod.rs b/dubbo/src/directory/mod.rs index a4cf466..84900aa 100644 --- a/dubbo/src/directory/mod.rs +++ b/dubbo/src/directory/mod.rs @@ -16,107 +16,109 @@ */ use std::{ - task::{Context, Poll}, collections::HashMap, sync::{Arc, Mutex}, pin::Pin, + collections::HashMap, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use crate::{ + codegen::{RpcInvocation, TripleInvoker}, + invocation::Invocation, + invoker::{clone_invoker::CloneInvoker, NewInvoker}, + param::Param, + registry::n_registry::Registry, + svc::NewService, + StdError, }; - -use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, invocation::Invocation, registry::n_registry::Registry, invoker::{NewInvoker,clone_invoker::CloneInvoker}, svc::NewService, param::Param}; use dubbo_logger::tracing::debug; use futures_core::ready; use futures_util::future; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; use tower::{ - discover::{Change, Discover}, buffer::Buffer, + buffer::Buffer, + discover::{Change, Discover}, }; use tower_service::Service; -type BufferedDirectory = Buffer<Directory<ReceiverStream<Result<Change<String, ()>, StdError>>>, ()>; +type BufferedDirectory = + Buffer<Directory<ReceiverStream<Result<Change<String, ()>, StdError>>>, ()>; pub struct NewCachedDirectory<N> where N: Registry + Clone + Send + Sync + 'static, { - inner: CachedDirectory<NewDirectory<N>, RpcInvocation> + inner: CachedDirectory<NewDirectory<N>, RpcInvocation>, } - -pub struct CachedDirectory<N, T> +pub struct CachedDirectory<N, T> where - // NewDirectory - N: NewService<T> + // NewDirectory + N: NewService<T>, { inner: N, - cache: Arc<Mutex<HashMap<String, N::Service>>> + cache: Arc<Mutex<HashMap<String, N::Service>>>, } - pub struct NewDirectory<N> { // registry inner: N, } - pub struct Directory<D> { directory: HashMap<String, CloneInvoker<TripleInvoker>>, discover: D, new_invoker: NewInvoker, } - - -impl<N> NewCachedDirectory<N> +impl<N> NewCachedDirectory<N> where N: Registry + Clone + Send + Sync + 'static, { - pub fn layer() -> impl tower_layer::Layer<N, Service = Self> { tower_layer::layer_fn(|inner: N| { NewCachedDirectory { - // inner is registry - inner: CachedDirectory::new(NewDirectory::new(inner)), + // inner is registry + inner: CachedDirectory::new(NewDirectory::new(inner)), } }) } } - impl<N, T> NewService<T> for NewCachedDirectory<N> where T: Param<RpcInvocation>, // service registry N: Registry + Clone + Send + Sync + 'static, { - type Service = BufferedDirectory; + type Service = BufferedDirectory; fn new_service(&self, target: T) -> Self::Service { - self.inner.new_service(target.param()) } -} - - -impl<N, T> CachedDirectory<N, T> +} + +impl<N, T> CachedDirectory<N, T> where - N: NewService<T> + N: NewService<T>, { - pub fn new(inner: N) -> Self { CachedDirectory { inner, - cache: Default::default() + cache: Default::default(), } - } -} - - -impl<N, T> NewService<T> for CachedDirectory<N, T> + } +} + +impl<N, T> NewService<T> for CachedDirectory<N, T> where T: Param<RpcInvocation>, // NewDirectory N: NewService<T>, // Buffered directory - N::Service: Clone + N::Service: Clone, { type Service = N::Service; @@ -124,44 +126,35 @@ where let rpc_invocation = target.param(); let service_name = rpc_invocation.get_target_service_unique_name(); let mut cache = self.cache.lock().expect("cached directory lock failed."); - let value = cache.get(&service_name).map(|val|val.clone()); + let value = cache.get(&service_name).map(|val| val.clone()); match value { None => { let new_service = self.inner.new_service(target); cache.insert(service_name, new_service.clone()); new_service - }, - Some(value) => value - } + } + Some(value) => value, + } } } - impl<N> NewDirectory<N> { - const MAX_DIRECTORY_BUFFER_SIZE: usize = 16; pub fn new(inner: N) -> Self { - NewDirectory { - inner - } + NewDirectory { inner } } -} - - +} -impl<N, T> NewService<T> for NewDirectory<N> +impl<N, T> NewService<T> for NewDirectory<N> where T: Param<RpcInvocation>, // service registry - N: Registry + Clone + Send + Sync + 'static, + N: Registry + Clone + Send + Sync + 'static, { - type Service = BufferedDirectory; + type Service = BufferedDirectory; - - fn new_service(&self, target: T) -> Self::Service { - let service_name = target.param().get_target_service_unique_name(); let registry = self.inner.clone(); @@ -172,39 +165,35 @@ where let receiver = registry.subscribe(service_name).await; debug!("discover start!"); match receiver { - Err(e) => { + Err(_e) => { // error!("discover stream error: {}", e); debug!("discover stream error"); - }, - Ok(mut receiver) => { - loop { - let change = receiver.recv().await; - debug!("receive change: {:?}", change); - match change { - None => { - debug!("discover stream closed."); - break; - }, - Some(change) => { - let _ = tx.send(change).await; - } + } + Ok(mut receiver) => loop { + let change = receiver.recv().await; + debug!("receive change: {:?}", change); + match change { + None => { + debug!("discover stream closed."); + break; + } + Some(change) => { + let _ = tx.send(change).await; } } - } + }, } + }); - }); - - Buffer::new(Directory::new(ReceiverStream::new(rx)), Self::MAX_DIRECTORY_BUFFER_SIZE) - } - -} - - -impl<D> Directory<D> { + Buffer::new( + Directory::new(ReceiverStream::new(rx)), + Self::MAX_DIRECTORY_BUFFER_SIZE, + ) + } +} +impl<D> Directory<D> { pub fn new(discover: D) -> Self { - Directory { directory: Default::default(), discover, @@ -213,13 +202,12 @@ impl<D> Directory<D> { } } - -impl<D> Service<()> for Directory<D> +impl<D> Service<()> for Directory<D> where // Discover - D: Discover<Key = String> + Unpin + Send, - D::Error: Into<StdError> -{ + D: Discover<Key = String> + Unpin + Send, + D::Error: Into<StdError>, +{ type Response = Vec<CloneInvoker<TripleInvoker>>; type Error = StdError; @@ -227,21 +215,22 @@ where type Future = future::Ready<Result<Self::Response, Self::Error>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - loop { let pin_discover = Pin::new(&mut self.discover); - let change = ready!(pin_discover.poll_discover(cx)).transpose().map_err(|e| e.into())?; + let change = ready!(pin_discover.poll_discover(cx)) + .transpose() + .map_err(|e| e.into())?; match change { Some(Change::Remove(key)) => { debug!("remove key: {}", key); self.directory.remove(&key); - }, + } Some(Change::Insert(key, _)) => { debug!("insert key: {}", key); let invoker = self.new_invoker.new_service(key.clone()); self.directory.insert(key, invoker); - }, - None => { + } + None => { debug!("stream closed"); return Poll::Ready(Ok(())); } @@ -250,7 +239,11 @@ where } fn call(&mut self, _: ()) -> Self::Future { - let vec = self.directory.values().map(|val|val.clone()).collect::<Vec<CloneInvoker<TripleInvoker>>>(); + let vec = self + .directory + .values() + .map(|val| val.clone()) + .collect::<Vec<CloneInvoker<TripleInvoker>>>(); future::ok(vec) } -} \ No newline at end of file +} diff --git a/dubbo/src/invoker/clone_body.rs b/dubbo/src/invoker/clone_body.rs index 5ce2e1f..4de8f89 100644 --- a/dubbo/src/invoker/clone_body.rs +++ b/dubbo/src/invoker/clone_body.rs @@ -7,20 +7,20 @@ use std::{ use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures_core::ready; - + use http::HeaderMap; use http_body::Body; use pin_project::pin_project; use thiserror::Error; use crate::StdError; - + #[derive(Error, Debug)] -#[error("buffered body reach max capacity.")] +#[error("buffered body reach max capacity.")] pub struct ReachMaxCapacityError; pub struct BufferedBody { - shared: Arc<Mutex<Option<OwnedBufferedBody>>>, + shared: Arc<Mutex<Option<OwnedBufferedBody>>>, owned: Option<OwnedBufferedBody>, replay_body: bool, replay_trailers: bool, @@ -34,10 +34,7 @@ pub struct OwnedBufferedBody { buf: InnerBuffer, } - - impl BufferedBody { - pub fn new(body: hyper::Body, buf_size: usize) -> Self { let size_hint = body.size_hint(); let is_empty = body.is_end_stream(); @@ -57,11 +54,9 @@ impl BufferedBody { size_hint, } } - } impl Clone for BufferedBody { - fn clone(&self) -> Self { Self { shared: self.shared.clone(), @@ -86,7 +81,6 @@ impl Drop for BufferedBody { } impl Body for BufferedBody { - type Data = BytesData; type Error = StdError; @@ -106,14 +100,10 @@ impl Body for BufferedBody { data.take().expect("cannot get shared buffered body.") }); - - if mut_self.replay_body { mut_self.replay_body = false; if owned_body.buf.has_remaining() { - return Poll::Ready(Some(Ok(BytesData::BufferedBytes( - owned_body.buf.clone(), - )))); + return Poll::Ready(Some(Ok(BytesData::BufferedBytes(owned_body.buf.clone())))); } if owned_body.buf.is_capped() { @@ -150,10 +140,8 @@ impl Body for BufferedBody { } else { owned_body.buf.push_bytes(data.copy_to_bytes(len)) }; - - Poll::Ready(Some(Ok(BytesData::OriginBytes(data)))) - + Poll::Ready(Some(Ok(BytesData::OriginBytes(data)))) } fn poll_trailers( @@ -170,7 +158,7 @@ impl Body for BufferedBody { data.take().expect("cannot get shared buffered body.") }); - + if mut_self.replay_trailers { mut_self.replay_trailers = false; if let Some(ref trailers) = owned_body.trailers { @@ -184,7 +172,7 @@ impl Body for BufferedBody { owned_body.trailers = trailers.clone(); trailers }); - return Poll::Ready(trailers.map_err(|e|e.into())); + return Poll::Ready(trailers.map_err(|e| e.into())); } Poll::Ready(Ok(None)) @@ -195,9 +183,11 @@ impl Body for BufferedBody { return true; } - let is_end = self.owned.as_ref() - .map(|owned|owned.body.is_end_stream()) - .unwrap_or(false); + let is_end = self + .owned + .as_ref() + .map(|owned| owned.body.is_end_stream()) + .unwrap_or(false); !self.replay_body && !self.replay_trailers && is_end } @@ -205,12 +195,8 @@ impl Body for BufferedBody { fn size_hint(&self) -> http_body::SizeHint { self.size_hint.clone() } - - } - - #[derive(Clone)] pub struct InnerBuffer { bufs: VecDeque<Bytes>, @@ -328,13 +314,12 @@ pub struct CloneBody(#[pin] BufferedBody); impl CloneBody { pub fn new(inner_body: hyper::Body) -> Self { - let inner_body = BufferedBody::new(inner_body, 1024 * 64); + let inner_body = BufferedBody::new(inner_body, 1024 * 64); CloneBody(inner_body) } } -impl Body for CloneBody{ - +impl Body for CloneBody { type Data = BytesData; type Error = StdError; @@ -350,7 +335,7 @@ impl Body for CloneBody{ self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { - self.project().0.poll_trailers(cx) + self.project().0.poll_trailers(cx) } fn size_hint(&self) -> http_body::SizeHint { @@ -358,9 +343,8 @@ impl Body for CloneBody{ } } - impl Clone for CloneBody { fn clone(&self) -> Self { Self(self.0.clone()) } -} \ No newline at end of file +} diff --git a/dubbo/src/invoker/clone_invoker.rs b/dubbo/src/invoker/clone_invoker.rs index fe621b8..c1fa00d 100644 --- a/dubbo/src/invoker/clone_invoker.rs +++ b/dubbo/src/invoker/clone_invoker.rs @@ -1,76 +1,81 @@ -use std::{task::Poll, pin::Pin, mem}; +use std::{mem, pin::Pin, task::Poll}; use dubbo_logger::tracing::debug; -use futures_core::{Future, TryFuture, ready, future::BoxFuture}; +use futures_core::{future::BoxFuture, ready, Future, TryFuture}; use futures_util::FutureExt; use pin_project::pin_project; use thiserror::Error; -use tokio::{task::JoinHandle, sync::{watch::{Sender, Receiver}, self}}; +use tokio::{ + sync::{ + self, + watch::{Receiver, Sender}, + }, + task::JoinHandle, +}; use tokio_util::sync::ReusableBoxFuture; -use tower::{ServiceExt, buffer::Buffer}; +use tower::{buffer::Buffer, ServiceExt}; use tower_service::Service; use crate::StdError; use super::clone_body::CloneBody; - + enum Inner<S> { Invalid, Ready(S), Pending(JoinHandle<Result<S, (S, StdError)>>), -} +} #[derive(Debug, Error)] #[error("the inner service has not got ready yet!")] struct InnerServiceNotReadyErr; - - #[pin_project(project = InnerServiceCallingResponseProj)] enum InnerServiceCallingResponse<Fut> { Call(#[pin] Fut), - Fail + Fail, } -impl<Fut> Future for InnerServiceCallingResponse<Fut> +impl<Fut> Future for InnerServiceCallingResponse<Fut> where Fut: TryFuture, - Fut::Error: Into<StdError> + Fut::Error: Into<StdError>, { type Output = Result<Fut::Ok, StdError>; - + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { - match self.project() { - InnerServiceCallingResponseProj::Call(call) => call.try_poll(cx).map_err(Into::into), - InnerServiceCallingResponseProj::Fail => Poll::Ready(Err(InnerServiceNotReadyErr.into())) - } + match self.project() { + InnerServiceCallingResponseProj::Call(call) => call.try_poll(cx).map_err(Into::into), + InnerServiceCallingResponseProj::Fail => { + Poll::Ready(Err(InnerServiceNotReadyErr.into())) + } + } } } - + #[derive(Clone)] enum ObserveState { Ready, Pending, } - struct ReadyService<S> { inner: Inner<S>, - tx: Sender<ObserveState> + tx: Sender<ObserveState>, } - impl<S> ReadyService<S> { - fn new(inner: S) -> (Self, Receiver<ObserveState>) { let (tx, rx) = sync::watch::channel(ObserveState::Ready); - let ready_service = Self { inner: Inner::Ready(inner), tx}; + let ready_service = Self { + inner: Inner::Ready(inner), + tx, + }; (ready_service, rx) } - } -impl<S, Req> Service<Req> for ReadyService<S> +impl<S, Req> Service<Req> for ReadyService<S> where S: Service<Req> + Send + 'static, <S as Service<Req>>::Error: Into<StdError>, @@ -85,16 +90,14 @@ where loop { match mem::replace(&mut self.inner, Inner::Invalid) { Inner::Ready(mut svc) => { - let poll_ready = svc.poll_ready(cx); + let poll_ready = svc.poll_ready(cx); match poll_ready { Poll::Pending => { self.inner = Inner::Pending(tokio::spawn(async move { let poll_ready = svc.ready().await; match poll_ready { Ok(_) => Ok(svc), - Err(err) => { - Err((svc, err.into())) - } + Err(err) => Err((svc, err.into())), } })); @@ -108,17 +111,17 @@ where return Poll::Ready(ret.map_err(Into::into)); } } - }, + } Inner::Pending(mut join_handle) => { if let Poll::Ready(res) = join_handle.poll_unpin(cx) { let (svc, res) = match res { Err(join_err) => panic!("ReadyService panicked: {join_err}"), Ok(Err((svc, err))) => (svc, Poll::Ready(Err(err))), - Ok(Ok(svc)) => (svc, Poll::Ready(Ok(()))) + Ok(Ok(svc)) => (svc, Poll::Ready(Ok(()))), }; self.inner = Inner::Ready(svc); - + let _ = self.tx.send(ObserveState::Ready); return res; } else { @@ -127,31 +130,29 @@ where let _ = self.tx.send(ObserveState::Pending); return Poll::Pending; } - - }, - Inner::Invalid => panic!("ReadyService panicked: inner state is invalid") + } + Inner::Invalid => panic!("ReadyService panicked: inner state is invalid"), } } } fn call(&mut self, req: Req) -> Self::Future { - match self.inner { - Inner::Ready(ref mut svc) => InnerServiceCallingResponse::Call(svc.call(req)), - _ => InnerServiceCallingResponse::Fail - } + match self.inner { + Inner::Ready(ref mut svc) => InnerServiceCallingResponse::Call(svc.call(req)), + _ => InnerServiceCallingResponse::Fail, + } } } - impl<S> Drop for ReadyService<S> { fn drop(&mut self) { - if let Inner::Pending(ref handler) = self.inner { + if let Inner::Pending(ref handler) = self.inner { handler.abort(); - } + } } } -pub struct CloneInvoker<Inv> +pub struct CloneInvoker<Inv> where Inv: Service<http::Request<CloneBody>> + Send + 'static, Inv::Error: Into<StdError> + Send + Sync + 'static, @@ -163,40 +164,44 @@ where polling: bool, } -impl<Inv> CloneInvoker<Inv> +impl<Inv> CloneInvoker<Inv> where Inv: Service<http::Request<CloneBody>> + Send + 'static, Inv::Error: Into<StdError> + Send + Sync + 'static, Inv::Future: Send, { - const MAX_INVOKER_BUFFER_SIZE: usize = 16; - + pub fn new(invoker: Inv) -> Self { - let (ready_service, rx) = ReadyService::new(invoker); - let buffer: Buffer<ReadyService<Inv>, http::Request<CloneBody>> = Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE); + let buffer: Buffer<ReadyService<Inv>, http::Request<CloneBody>> = + Buffer::new(ready_service, Self::MAX_INVOKER_BUFFER_SIZE); - Self { inner: buffer, rx, polling: false, poll: ReusableBoxFuture::new(futures::future::pending()) } + Self { + inner: buffer, + rx, + polling: false, + poll: ReusableBoxFuture::new(futures::future::pending()), + } } } -impl<Inv> Service<http::Request<CloneBody>> for CloneInvoker<Inv> +impl<Inv> Service<http::Request<CloneBody>> for CloneInvoker<Inv> where Inv: Service<http::Request<CloneBody>> + Send + 'static, Inv::Error: Into<StdError> + Send + Sync + 'static, Inv::Future: Send, { type Response = Inv::Response; - + type Error = StdError; - + type Future = BoxFuture<'static, Result<Self::Response, StdError>>; fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { loop { - if !self.polling { + if !self.polling { match self.rx.borrow().clone() { ObserveState::Ready => return self.inner.poll_ready(cx), ObserveState::Pending => { @@ -206,7 +211,7 @@ where loop { let current_state = rx.borrow_and_update().clone(); if matches!(current_state, ObserveState::Ready) { - return current_state; + return current_state; } if let Err(_) = rx.changed().await { debug!("the readyService has already shutdown!"); @@ -216,7 +221,6 @@ where }); } } - } let state = ready!(self.poll.poll_unpin(cx)); @@ -235,14 +239,18 @@ where } } - -impl<Inv> Clone for CloneInvoker<Inv> +impl<Inv> Clone for CloneInvoker<Inv> where Inv: Service<http::Request<CloneBody>> + Send + 'static, Inv::Error: Into<StdError> + Send + Sync + 'static, Inv::Future: Send, { fn clone(&self) -> Self { - Self { inner: self.inner.clone(), rx: self.rx.clone(), polling: false, poll: ReusableBoxFuture::new(futures::future::pending())} + Self { + inner: self.inner.clone(), + rx: self.rx.clone(), + polling: false, + poll: ReusableBoxFuture::new(futures::future::pending()), + } } -} \ No newline at end of file +} diff --git a/dubbo/src/invoker/mod.rs b/dubbo/src/invoker/mod.rs index a8179ee..92b8b46 100644 --- a/dubbo/src/invoker/mod.rs +++ b/dubbo/src/invoker/mod.rs @@ -1,14 +1,12 @@ use dubbo_base::Url; -use crate::{codegen::TripleInvoker, svc::NewService, invoker::clone_invoker::CloneInvoker}; +use crate::{codegen::TripleInvoker, invoker::clone_invoker::CloneInvoker, svc::NewService}; pub mod clone_body; pub mod clone_invoker; - pub struct NewInvoker; - impl NewService<String> for NewInvoker { type Service = CloneInvoker<TripleInvoker>; @@ -18,4 +16,4 @@ impl NewService<String> for NewInvoker { let url = Url::from_url(&url).unwrap(); CloneInvoker::new(TripleInvoker::new(url)) } -} \ No newline at end of file +} diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs index 252e01c..d397b42 100644 --- a/dubbo/src/lib.rs +++ b/dubbo/src/lib.rs @@ -18,20 +18,20 @@ pub mod cluster; pub mod codegen; pub mod context; +pub mod directory; pub mod filter; mod framework; pub mod invocation; +pub mod invoker; +pub mod loadbalancer; +pub mod param; pub mod protocol; pub mod registry; +pub mod route; pub mod status; +pub mod svc; pub mod triple; pub mod utils; -pub mod directory; -pub mod route; -pub mod loadbalancer; -pub mod invoker; -pub mod param; -pub mod svc; use http_body::Body; use std::{future::Future, pin::Pin}; diff --git a/dubbo/src/loadbalancer/mod.rs b/dubbo/src/loadbalancer/mod.rs index 334350e..4e26781 100644 --- a/dubbo/src/loadbalancer/mod.rs +++ b/dubbo/src/loadbalancer/mod.rs @@ -1,16 +1,20 @@ use futures_core::future::BoxFuture; -use tower::ServiceExt; -use tower::discover::ServiceList; +use tower::{discover::ServiceList, ServiceExt}; use tower_service::Service; -use crate::invoker::clone_body::CloneBody; -use crate::{codegen::RpcInvocation, StdError, svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker}; +use crate::{ + codegen::RpcInvocation, + invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker}, + param::Param, + svc::NewService, + StdError, +}; use crate::protocol::triple::triple_invoker::TripleInvoker; - + pub struct NewLoadBalancer<N> { inner: N, -} +} #[derive(Clone)] pub struct LoadBalancer<S> { @@ -18,80 +22,74 @@ pub struct LoadBalancer<S> { } impl<N> NewLoadBalancer<N> { - - pub fn layer() -> impl tower_layer::Layer<N, Service = Self>{ - + pub fn layer() -> impl tower_layer::Layer<N, Service = Self> { tower_layer::layer_fn(|inner| { - NewLoadBalancer { - inner // NewRoutes + inner, // NewRoutes } }) } } -impl<N, T> NewService<T> for NewLoadBalancer<N> +impl<N, T> NewService<T> for NewLoadBalancer<N> where T: Param<RpcInvocation> + Clone, // NewRoutes - N: NewService<T>, + N: NewService<T>, { - type Service = LoadBalancer<N::Service>; fn new_service(&self, target: T) -> Self::Service { // Routes service let svc = self.inner.new_service(target); - LoadBalancer { - inner: svc, - } - + LoadBalancer { inner: svc } } } -impl<N> Service<http::Request<CloneBody>> for LoadBalancer<N> +impl<N> Service<http::Request<CloneBody>> for LoadBalancer<N> where // Routes service N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Clone, N::Error: Into<StdError> + Send, - N::Future: Send + 'static, + N::Future: Send + 'static, { - type Response = <CloneInvoker<TripleInvoker> as Service<http::Request<CloneBody>>>::Response; type Error = StdError; type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> { + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), Self::Error>> { self.inner.poll_ready(cx).map_err(Into::into) - } fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future { - let routes = self.inner.call(()); + let routes = self.inner.call(()); let fut = async move { let routes = routes.await; let routes: Vec<CloneInvoker<TripleInvoker>> = match routes { Err(e) => return Err(Into::<StdError>::into(e)), - Ok(routes) => routes + Ok(routes) => routes, }; - - let service_list: Vec<_> = routes.into_iter().map(|invoker| { - tower::load::Constant::new(invoker, 1) - }).collect(); + + let service_list: Vec<_> = routes + .into_iter() + .map(|invoker| tower::load::Constant::new(invoker, 1)) + .collect(); let service_list = ServiceList::new(service_list); - + let p2c = tower::balance::p2c::Balance::new(service_list); - p2c.oneshot(req).await }; - + Box::pin(fut) } } diff --git a/dubbo/src/param.rs b/dubbo/src/param.rs index b57f98e..bef5041 100644 --- a/dubbo/src/param.rs +++ b/dubbo/src/param.rs @@ -1,12 +1,9 @@ pub trait Param<T> { - fn param(&self) -> T; } - impl<T: ToOwned> Param<T::Owned> for T { - fn param(&self) -> T::Owned { - self.to_owned() + self.to_owned() } } diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 9d35a52..7dbf1f3 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -15,9 +15,7 @@ * limitations under the License. */ -use std::{ - task::{Context, Poll}, -}; +use std::task::{Context, Poll}; use async_trait::async_trait; use aws_smithy_http::body::SdkBody; diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index 71eae90..c8451e3 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -16,14 +16,17 @@ */ use dubbo_base::Url; -use http::{Uri, HeaderValue}; +use http::{HeaderValue, Uri}; use std::{ fmt::{Debug, Formatter}, str::FromStr, }; use tower_service::Service; -use crate::{triple::transport::{connection::Connection, self}, invoker::clone_body::CloneBody}; +use crate::{ + invoker::clone_body::CloneBody, + triple::transport::{self, connection::Connection}, +}; pub struct TripleInvoker { url: Url, @@ -47,18 +50,19 @@ impl Debug for TripleInvoker { } impl TripleInvoker { - pub fn map_request( - &self, - req: http::Request<CloneBody>, - ) -> http::Request<CloneBody> { - + pub fn map_request(&self, req: http::Request<CloneBody>) -> http::Request<CloneBody> { let (parts, body) = req.into_parts(); let path_and_query = parts.headers.get("path").unwrap().to_str().unwrap(); - let authority = self.url.clone().get_ip_port(); - - let uri = Uri::builder().scheme("http").authority(authority).path_and_query(path_and_query).build().unwrap(); + let authority = self.url.clone().get_ip_port(); + + let uri = Uri::builder() + .scheme("http") + .authority(authority) + .path_and_query(path_and_query) + .build() + .unwrap(); let mut req = hyper::Request::builder() .version(http::Version::HTTP_2) @@ -99,12 +103,12 @@ impl TripleInvoker { HeaderValue::from_static("dubbo-rust/0.1.0"), ); // if let Some(_encoding) = self.send_compression_encoding { - + // } req.headers_mut() - .insert("grpc-encoding", http::HeaderValue::from_static("gzip")); - + .insert("grpc-encoding", http::HeaderValue::from_static("gzip")); + req.headers_mut().insert( "grpc-accept-encoding", http::HeaderValue::from_static("gzip"), @@ -137,7 +141,10 @@ impl Service<http::Request<CloneBody>> for TripleInvoker { &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Result<(), Self::Error>> { - <transport::connection::Connection as Service<http::Request<CloneBody>>>::poll_ready(&mut self.conn, cx) + <transport::connection::Connection as Service<http::Request<CloneBody>>>::poll_ready( + &mut self.conn, + cx, + ) } fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future { @@ -146,4 +153,3 @@ impl Service<http::Request<CloneBody>> for TripleInvoker { self.conn.call(req) } } - diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs index ab2297f..d8ff6ef 100644 --- a/dubbo/src/registry/mod.rs +++ b/dubbo/src/registry/mod.rs @@ -18,9 +18,9 @@ #![allow(unused_variables, dead_code, missing_docs)] pub mod integration; pub mod memory_registry; +pub mod n_registry; pub mod protocol; pub mod types; -pub mod n_registry; use std::{ fmt::{Debug, Formatter}, @@ -29,7 +29,6 @@ use std::{ use dubbo_base::Url; - pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 'static>; pub trait Registry { fn register(&mut self, url: Url) -> Result<(), crate::StdError>; diff --git a/dubbo/src/registry/n_registry.rs b/dubbo/src/registry/n_registry.rs index 9b6dca5..928c9b3 100644 --- a/dubbo/src/registry/n_registry.rs +++ b/dubbo/src/registry/n_registry.rs @@ -2,19 +2,17 @@ use std::sync::Arc; use async_trait::async_trait; use dubbo_base::Url; -use tokio::sync::mpsc::{Receiver, channel}; +use tokio::sync::mpsc::{channel, Receiver}; use tower::discover::Change; - use crate::StdError; type DiscoverStream = Receiver<Result<Change<String, ()>, StdError>>; #[async_trait] pub trait Registry { - async fn register(&self, url: Url) -> Result<(), StdError>; - + async fn unregister(&self, url: Url) -> Result<(), StdError>; // todo service_name change to url @@ -25,31 +23,29 @@ pub trait Registry { #[derive(Clone)] pub struct ArcRegistry { - inner: Arc<dyn Registry + Send + Sync + 'static> + inner: Arc<dyn Registry + Send + Sync + 'static>, } - pub enum RegistryComponent { NacosRegistry, ZookeeperRegistry, StaticRegistry(StaticRegistry), } - pub struct StaticRegistry { - urls: Vec<Url> + urls: Vec<Url>, } impl ArcRegistry { - pub fn new(registry: impl Registry + Send + Sync + 'static) -> Self { - Self { inner: Arc::new(registry) } + Self { + inner: Arc::new(registry), + } } } #[async_trait] impl Registry for ArcRegistry { - async fn register(&self, url: Url) -> Result<(), StdError> { self.inner.register(url).await } @@ -67,9 +63,6 @@ impl Registry for ArcRegistry { } } - - - #[async_trait] impl Registry for RegistryComponent { async fn register(&self, url: Url) -> Result<(), StdError> { @@ -93,17 +86,12 @@ impl Registry for RegistryComponent { } } - impl StaticRegistry { - pub fn new(urls: Vec<Url>) -> Self { - Self { - urls - } + Self { urls } } } - #[async_trait] impl Registry for StaticRegistry { async fn register(&self, url: Url) -> Result<(), StdError> { @@ -119,7 +107,7 @@ impl Registry for StaticRegistry { for url in self.urls.iter() { let change = Ok(Change::Insert(url.to_url(), ())); tx.send(change).await?; - } + } Ok(rx) } @@ -127,4 +115,4 @@ impl Registry for StaticRegistry { async fn unsubscribe(&self, url: Url) -> Result<(), StdError> { todo!() } -} \ No newline at end of file +} diff --git a/dubbo/src/route/mod.rs b/dubbo/src/route/mod.rs index cff3cce..c244864 100644 --- a/dubbo/src/route/mod.rs +++ b/dubbo/src/route/mod.rs @@ -1,42 +1,51 @@ use std::pin::Pin; use dubbo_logger::tracing::debug; -use futures_core::{Future, ready}; +use futures_core::{ready, Future}; use futures_util::{future::Ready, FutureExt, TryFutureExt}; -use tower::{util::FutureService, buffer::Buffer}; +use tower::{buffer::Buffer, util::FutureService}; use tower_service::Service; - -use crate::{StdError, codegen::{RpcInvocation, TripleInvoker}, svc::NewService, param::Param, invoker::clone_invoker::CloneInvoker}; + +use crate::{ + codegen::{RpcInvocation, TripleInvoker}, + invoker::clone_invoker::CloneInvoker, + param::Param, + svc::NewService, + StdError, +}; pub struct NewRoutes<N> { inner: N, -} - +} pub struct NewRoutesFuture<S, T> { inner: RoutesFutureInnerState<S>, target: T, } - pub enum RoutesFutureInnerState<S> { Service(S), - Future(Pin<Box<dyn Future<Output = Result<Vec<CloneInvoker<TripleInvoker>>, StdError>> + Send + 'static>>), + Future( + Pin< + Box< + dyn Future<Output = Result<Vec<CloneInvoker<TripleInvoker>>, StdError>> + + Send + + 'static, + >, + >, + ), Ready(Vec<CloneInvoker<TripleInvoker>>), } - #[derive(Clone)] pub struct Routes<T> { target: T, - invokers: Vec<CloneInvoker<TripleInvoker>> + invokers: Vec<CloneInvoker<TripleInvoker>>, } impl<N> NewRoutes<N> { pub fn new(inner: N) -> Self { - Self { - inner, - } + Self { inner } } } @@ -44,40 +53,39 @@ impl<N> NewRoutes<N> { const MAX_ROUTE_BUFFER_SIZE: usize = 16; pub fn layer() -> impl tower_layer::Layer<N, Service = Self> { - tower_layer::layer_fn(|inner: N| { - NewRoutes::new(inner) - }) + tower_layer::layer_fn(|inner: N| NewRoutes::new(inner)) } } - -impl<N, T> NewService<T> for NewRoutes<N> +impl<N, T> NewService<T> for NewRoutes<N> where - T: Param<RpcInvocation> + Clone + Send + Unpin + 'static, + T: Param<RpcInvocation> + Clone + Send + Unpin + 'static, // NewDirectory N: NewService<T>, - // Directory + // Directory N::Service: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin + Send + 'static, - <N::Service as Service<()>>::Error: Into<StdError>, + <N::Service as Service<()>>::Error: Into<StdError>, <N::Service as Service<()>>::Future: Send + 'static, -{ - - type Service = Buffer<FutureService<NewRoutesFuture<<N as NewService<T>>::Service, T>, Routes<T>>, ()>; +{ + type Service = + Buffer<FutureService<NewRoutesFuture<<N as NewService<T>>::Service, T>, Routes<T>>, ()>; fn new_service(&self, target: T) -> Self::Service { let inner = self.inner.new_service(target.clone()); - Buffer::new(FutureService::new(NewRoutesFuture { - inner: RoutesFutureInnerState::Service(inner), - target, - }), Self::MAX_ROUTE_BUFFER_SIZE) + Buffer::new( + FutureService::new(NewRoutesFuture { + inner: RoutesFutureInnerState::Service(inner), + target, + }), + Self::MAX_ROUTE_BUFFER_SIZE, + ) } } - -impl<N, T> Future for NewRoutesFuture<N, T> +impl<N, T> Future for NewRoutesFuture<N, T> where - T: Param<RpcInvocation> + Clone + Unpin, + T: Param<RpcInvocation> + Clone + Unpin, // Directory N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Unpin, N::Error: Into<StdError>, @@ -85,8 +93,10 @@ where { type Output = Result<Routes<T>, StdError>; - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> { - + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Self::Output> { let this = self.get_mut(); loop { @@ -94,14 +104,14 @@ where RoutesFutureInnerState::Service(ref mut service) => { debug!("RoutesFutureInnerState::Service"); let _ = ready!(service.poll_ready(cx)).map_err(Into::into)?; - let fut = service.call(()).map_err(|e|e.into()).boxed(); + let fut = service.call(()).map_err(|e| e.into()).boxed(); this.inner = RoutesFutureInnerState::Future(fut); - }, + } RoutesFutureInnerState::Future(ref mut futures) => { debug!("RoutesFutureInnerState::Future"); let invokers = ready!(futures.as_mut().poll(cx))?; this.inner = RoutesFutureInnerState::Ready(invokers); - }, + } RoutesFutureInnerState::Ready(ref invokers) => { debug!("RoutesFutureInnerState::Ready"); let target = this.target.clone(); @@ -109,32 +119,32 @@ where invokers: invokers.clone(), target, })); - }, + } } } - } } - - -impl<T> Service<()> for Routes<T> +impl<T> Service<()> for Routes<T> where - T: Param<RpcInvocation> + Clone, -{ + T: Param<RpcInvocation> + Clone, +{ type Response = Vec<CloneInvoker<TripleInvoker>>; - + type Error = StdError; - - type Future = Ready<Result<Self::Response, Self::Error>>; - fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> { + type Future = Ready<Result<Self::Response, Self::Error>>; + + fn poll_ready( + &mut self, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), Self::Error>> { std::task::Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { // some router operator // if new_invokers changed, send new invokers to routes_rx after router operator - futures_util::future::ok(self.invokers.clone()) + futures_util::future::ok(self.invokers.clone()) } -} \ No newline at end of file +} diff --git a/dubbo/src/svc.rs b/dubbo/src/svc.rs index 56dc330..df4c071 100644 --- a/dubbo/src/svc.rs +++ b/dubbo/src/svc.rs @@ -1,23 +1,19 @@ -use std::{sync::Arc, pin::Pin, marker::PhantomData}; +use std::{marker::PhantomData, sync::Arc}; + + + -use futures_core::Future; -use tower::ServiceExt; -use tower_service::Service; pub trait NewService<T> { - type Service; fn new_service(&self, target: T) -> Self::Service; - } - pub struct ArcNewService<T, S> { inner: Arc<dyn NewService<T, Service = S> + Send + Sync>, } - impl<T, S> ArcNewService<T, S> { pub fn layer<N>() -> impl tower_layer::Layer<N, Service = Self> + Clone + Copy where @@ -57,23 +53,19 @@ impl<T, S> NewService<T> for ArcNewService<T, S> { // inner: Box<dyn Service<T, Response = U, Error = E, Future = Pin<Box<dyn Future<Output = Result<U, E>> + Send>>> + Send>, pub struct BoxedService<N, R> { inner: N, - _mark: PhantomData<R> + _mark: PhantomData<R>, } impl<R, N> BoxedService<N, R> { - - pub fn layer() -> impl tower_layer::Layer<N, Service = Self>{ - tower_layer::layer_fn(|inner: N| { - Self { - inner, - _mark: PhantomData - } + pub fn layer() -> impl tower_layer::Layer<N, Service = Self> { + tower_layer::layer_fn(|inner: N| Self { + inner, + _mark: PhantomData, }) } } - -// impl<T, R, N> NewService<T> for BoxedService<N, R> +// impl<T, R, N> NewService<T> for BoxedService<N, R> // where // N: NewService<T>, // N::Service: Service<R> + Send, diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index ed08021..c4e6e62 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -15,11 +15,15 @@ * limitations under the License. */ - use std::sync::Arc; use crate::{ - utils::boxed_clone::BoxCloneService, registry::n_registry::{RegistryComponent, StaticRegistry, ArcRegistry}, route::NewRoutes, loadbalancer::NewLoadBalancer, cluster::{NewCluster, Cluster}, directory::NewCachedDirectory, svc::{ArcNewService, NewService, BoxedService}, StdError, codegen::RpcInvocation, BoxBody, + cluster::NewCluster, + directory::NewCachedDirectory, + loadbalancer::NewLoadBalancer, + registry::n_registry::{ArcRegistry, RegistryComponent, StaticRegistry}, + route::NewRoutes, + utils::boxed_clone::BoxCloneService, }; use aws_smithy_http::body::SdkBody; @@ -29,7 +33,6 @@ use tower::ServiceBuilder; pub type ClientBoxService = BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, crate::Error>; - pub type ServiceMK = Arc<NewCluster<NewLoadBalancer<NewRoutes<NewCachedDirectory<ArcRegistry>>>>>; #[derive(Default)] @@ -56,7 +59,9 @@ impl ClientBuilder { Self { timeout: None, connector: "", - registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(StaticRegistry::new(vec![Url::from_url(host).unwrap()])))), + registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry( + StaticRegistry::new(vec![Url::from_url(host).unwrap()]), + ))), direct: true, host: host.to_string(), } @@ -70,24 +75,23 @@ impl ClientBuilder { } pub fn with_registry(self, registry: RegistryComponent) -> Self { - Self { - registry: Some(ArcRegistry::new(registry)), + Self { + registry: Some(ArcRegistry::new(registry)), ..self } } pub fn with_host(self, host: &'static str) -> Self { Self { - registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry(StaticRegistry::new(vec![Url::from_url(host).unwrap()])))), + registry: Some(ArcRegistry::new(RegistryComponent::StaticRegistry( + StaticRegistry::new(vec![Url::from_url(host).unwrap()]), + ))), ..self } } pub fn with_connector(self, connector: &'static str) -> Self { - Self { - connector, - ..self - } + Self { connector, ..self } } pub fn with_direct(self, direct: bool) -> Self { @@ -95,16 +99,14 @@ impl ClientBuilder { } pub fn build(mut self) -> ServiceMK { - - let registry = self.registry.take().expect("registry must not be empty"); let mk_service = ServiceBuilder::new() - .layer(NewCluster::layer()) - .layer(NewLoadBalancer::layer()) - .layer(NewRoutes::layer()) - .layer(NewCachedDirectory::layer()) - .service(registry); + .layer(NewCluster::layer()) + .layer(NewLoadBalancer::layer()) + .layer(NewRoutes::layer()) + .layer(NewCachedDirectory::layer()) + .service(registry); Arc::new(mk_service) } diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 9bcf144..377dc43 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -15,7 +15,6 @@ * limitations under the License. */ - use futures_util::{future, stream, StreamExt, TryStreamExt}; use aws_smithy_http::body::SdkBody; @@ -25,9 +24,9 @@ use tower_service::Service; use super::builder::{ClientBuilder, ServiceMK}; use crate::codegen::RpcInvocation; -use crate::svc::NewService; use crate::{ invocation::{IntoStreamingRequest, Metadata, Request, Response}, + svc::NewService, triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode}, }; @@ -148,15 +147,12 @@ impl TripleClient { .into_stream(); let body = hyper::Body::wrap_stream(body_stream); - let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() .header("path", path.to_string()) - .body(body).unwrap(); - - + .body(body) + .unwrap(); let response = invoker .call(request) @@ -211,14 +207,13 @@ impl TripleClient { ) .into_stream(); let body = hyper::Body::wrap_stream(en); - - let mut invoker = self.mk.new_service(invocation); + let mut invoker = self.mk.new_service(invocation); let request = http::Request::builder() .header("path", path.to_string()) - .body(body).unwrap(); - + .body(body) + .unwrap(); let response = invoker .call(request) @@ -259,12 +254,10 @@ impl TripleClient { let body = hyper::Body::wrap_stream(en); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() .header("path", path.to_string()) - .body(body).unwrap(); - - + .body(body) + .unwrap(); // let mut conn = Connection::new().with_host(http_uri); let response = invoker @@ -322,11 +315,10 @@ impl TripleClient { let body = hyper::Body::wrap_stream(en); let mut invoker = self.mk.new_service(invocation); - let request = http::Request::builder() .header("path", path.to_string()) - .body(body).unwrap(); - + .body(body) + .unwrap(); let response = invoker .call(request) diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs index 3bbaa17..cb0b9d7 100644 --- a/dubbo/src/triple/transport/connection.rs +++ b/dubbo/src/triple/transport/connection.rs @@ -18,9 +18,15 @@ use hyper::client::{conn::Builder, service::Connect}; use tower_service::Service; -use crate::{boxed, triple::transport::connector::get_connector, StdError, invoker::clone_body::CloneBody}; +use crate::{ + boxed, invoker::clone_body::CloneBody, triple::transport::connector::get_connector, StdError, +}; -type HyperConnect = Connect<crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, StdError>, CloneBody, http::Uri>; +type HyperConnect = Connect< + crate::utils::boxed_clone::BoxCloneService<http::Uri, super::io::BoxIO, StdError>, + CloneBody, + http::Uri, +>; pub struct Connection { host: hyper::Uri, @@ -65,13 +71,10 @@ impl Connection { let hyper_connect: HyperConnect = Connect::new(get_connector(self.connector), builder); self.connect = Some(hyper_connect); self - } } impl Service<http::Request<CloneBody>> for Connection { - - type Response = http::Response<crate::BoxBody>; type Error = crate::Error; @@ -85,30 +88,28 @@ impl Service<http::Request<CloneBody>> for Connection { match self.connect { None => { panic!("connection must be built before use") - }, - Some(ref mut connect) => { - connect.poll_ready(cx).map_err(|e|e.into()) } + Some(ref mut connect) => connect.poll_ready(cx).map_err(|e| e.into()), } } fn call(&mut self, req: http::Request<CloneBody>) -> Self::Future { - match self.connect { None => { panic!("connection must be built before use") - }, + } Some(ref mut connect) => { let uri = self.host.clone(); let call_fut = connect.call(uri); let fut = async move { - let mut con = call_fut.await.unwrap(); - con.call(req).await - .map_err(|err| err.into()) - .map(|res| res.map(boxed)) + let mut con = call_fut.await.unwrap(); + con.call(req) + .await + .map_err(|err| err.into()) + .map(|res| res.map(boxed)) }; - return Box::pin(fut) + return Box::pin(fut); } } }