This is an automated email from the ASF dual-hosted git repository.
yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2445333 Ftr: clone http body to support FailoverCluster (#112)
2445333 is described below
commit 2445333aa6116b5083087bce367be91807a8e6a6
Author: Yang Yang <[email protected]>
AuthorDate: Thu Feb 16 21:48:44 2023 +0800
Ftr: clone http body to support FailoverCluster (#112)
* Rft(dubbo/client): use sdkbody to clone http body
* feat(cluster): failover impl for unary api
* style: cargo fmt
---
dubbo/Cargo.toml | 1 +
dubbo/src/cluster/mod.rs | 84 +++++++++++++++++++++++++++++
dubbo/src/protocol/mod.rs | 3 +-
dubbo/src/protocol/triple/triple_invoker.rs | 29 ++++++----
dubbo/src/triple/client/builder.rs | 4 +-
dubbo/src/triple/client/triple.rs | 28 ++++++----
examples/echo/src/echo/server.rs | 2 +-
7 files changed, 129 insertions(+), 22 deletions(-)
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 6a51730..e319259 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -33,6 +33,7 @@ tracing-subscriber = "0.3.15"
axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
+aws-smithy-http = "0.54.1"
dubbo-config = {path = "../config", version = "0.2.0"}
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 84980ea..2221afc 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -15,4 +15,88 @@
* limitations under the License.
*/
+use std::{sync::Arc, task::Poll};
+
+use aws_smithy_http::body::SdkBody;
+use tower_service::Service;
+
+use crate::{
+ common::url::Url,
+ empty_body,
+ protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker},
+};
+
pub mod directory;
+
+pub trait Directory {
+ fn list(&self, meta: String) -> Vec<BoxInvoker>;
+ fn is_empty(&self) -> bool;
+}
+
+type BoxDirectory = Box<dyn Directory>;
+
+pub struct FailoverCluster {
+ dir: Arc<BoxDirectory>,
+}
+
+impl FailoverCluster {
+ pub fn new(dir: BoxDirectory) -> FailoverCluster {
+ Self { dir: Arc::new(dir) }
+ }
+}
+
+impl Service<http::Request<SdkBody>> for FailoverCluster {
+ type Response = http::Response<crate::BoxBody>;
+
+ type Error = crate::Error;
+
+ type Future = crate::BoxFuture<Self::Response, Self::Error>;
+
+ fn poll_ready(
+ &mut self,
+ _cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ // if self.dir.is_empty() return err
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
+ println!("req: {}", req.body().content_length().unwrap());
+ let clone_body = req.body().try_clone().unwrap();
+ let mut clone_req = http::Request::builder()
+ .uri(req.uri().clone())
+ .method(req.method().clone());
+ *clone_req.headers_mut().unwrap() = req.headers().clone();
+ let r = clone_req.body(clone_body).unwrap();
+ let invokers = self.dir.list("service_name".to_string());
+ for mut invoker in invokers {
+ let fut = async move {
+ let res = invoker.call(r).await;
+ return res;
+ };
+ return Box::pin(fut);
+ }
+ Box::pin(async move {
+ Ok(http::Response::builder()
+ .status(200)
+ .header("grpc-status", "12")
+ .header("content-type", "application/grpc")
+ .body(empty_body())
+ .unwrap())
+ })
+ }
+}
+
+pub struct MockDirectory {}
+
+impl Directory for MockDirectory {
+ fn list(&self, meta: String) -> Vec<BoxInvoker> {
+ tracing::info!("MockDirectory: {}", meta);
+ let u =
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
+ vec![Box::new(TripleInvoker::new(u))]
+ }
+
+ fn is_empty(&self) -> bool {
+ false
+ }
+}
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 9505dca..656da2a 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -24,6 +24,7 @@ use std::{
};
use async_trait::async_trait;
+use aws_smithy_http::body::SdkBody;
use tower_service::Service;
use crate::common::url::Url;
@@ -58,7 +59,7 @@ pub trait Invoker<ReqBody> {
pub type BoxExporter = Box<dyn Exporter + Send + Sync>;
pub type BoxInvoker = Box<
dyn Invoker<
- http::Request<hyper::Body>,
+ http::Request<SdkBody>,
Response = http::Response<crate::BoxBody>,
Error = crate::Error,
Future = crate::BoxFuture<http::Response<crate::BoxBody>,
crate::Error>,
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs
b/dubbo/src/protocol/triple/triple_invoker.rs
index 4c411d1..df4d2ae 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -15,9 +15,17 @@
* limitations under the License.
*/
+use std::str::FromStr;
+
+use aws_smithy_http::body::SdkBody;
use tower_service::Service;
-use crate::{common::url::Url, protocol::Invoker,
triple::client::builder::ClientBoxService};
+use crate::{
+ common::url::Url,
+ protocol::Invoker,
+ triple::{client::builder::ClientBoxService,
transport::connection::Connection},
+ utils::boxed::BoxService,
+};
pub struct TripleInvoker {
url: Url,
@@ -25,16 +33,17 @@ pub struct TripleInvoker {
}
impl TripleInvoker {
- // pub fn new(url: Url) -> TripleInvoker {
- // let uri = http::Uri::from_str(&url.to_url()).unwrap();
- // Self {
- // url,
- // conn: ClientBuilder::from_uri(&uri).build()connect(),
- // }
- // }
+ pub fn new(url: Url) -> TripleInvoker {
+ let uri = http::Uri::from_str(&url.to_url()).unwrap();
+ let conn = Connection::new().with_host(uri);
+ Self {
+ url,
+ conn: BoxService::new(conn),
+ }
+ }
}
-impl Invoker<http::Request<hyper::Body>> for TripleInvoker {
+impl Invoker<http::Request<SdkBody>> for TripleInvoker {
type Response = http::Response<crate::BoxBody>;
type Error = crate::Error;
@@ -45,7 +54,7 @@ impl Invoker<http::Request<hyper::Body>> for TripleInvoker {
self.url.clone()
}
- fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
+ fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
self.conn.call(req)
}
diff --git a/dubbo/src/triple/client/builder.rs
b/dubbo/src/triple/client/builder.rs
index e19eb94..4746249 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -20,10 +20,12 @@ use crate::{
triple::compression::CompressionEncoding, utils::boxed::BoxService,
};
+use aws_smithy_http::body::SdkBody;
+
use super::TripleClient;
pub type ClientBoxService =
- BoxService<http::Request<hyper::Body>, http::Response<crate::BoxBody>,
crate::Error>;
+ BoxService<http::Request<SdkBody>, http::Response<crate::BoxBody>,
crate::Error>;
#[derive(Clone, Debug, Default)]
pub struct ClientBuilder {
diff --git a/dubbo/src/triple/client/triple.rs
b/dubbo/src/triple/client/triple.rs
index 3c6172d..454e953 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -19,12 +19,16 @@ use std::str::FromStr;
use futures_util::{future, stream, StreamExt, TryStreamExt};
+use aws_smithy_http::body::SdkBody;
use http::HeaderValue;
use rand::prelude::SliceRandom;
use tower_service::Service;
use super::{super::transport::connection::Connection, builder::ClientBuilder};
-use crate::codegen::{Directory, RpcInvocation};
+use crate::{
+ cluster::{FailoverCluster, MockDirectory},
+ codegen::{Directory, RpcInvocation},
+};
use crate::{
invocation::{IntoStreamingRequest, Metadata, Request, Response},
@@ -52,8 +56,8 @@ impl TripleClient {
&self,
uri: http::Uri,
path: http::uri::PathAndQuery,
- body: hyper::Body,
- ) -> http::Request<hyper::Body> {
+ body: SdkBody,
+ ) -> http::Request<SdkBody> {
let mut parts = uri.into_parts();
parts.path_and_query = Some(path);
@@ -140,16 +144,19 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
+ let bytes = hyper::body::to_bytes(body).await.unwrap();
+ let sdk_body = SdkBody::from(bytes);
let url_list = self.directory.as_ref().expect("msg").list(invocation);
let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
let http_uri =
http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- let req = self.map_request(http_uri.clone(), path, body);
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
- let mut conn = Connection::new().with_host(http_uri);
- let response = conn
+ // let mut conn = Connection::new().with_host(http_uri);
+ let mut cluster = FailoverCluster::new(Box::new(MockDirectory {}));
+ let response = cluster
.call(req)
.await
.map_err(|err| crate::status::Status::from_error(err.into()));
@@ -202,13 +209,14 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
+ let sdk_body = SdkBody::from(body);
let url_list = self.directory.as_ref().expect("msg").list(invocation);
let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
let http_uri =
http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- let req = self.map_request(http_uri.clone(), path, body);
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
let mut conn = Connection::new().with_host(http_uri);
let response = conn
@@ -248,13 +256,14 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
+ let sdk_body = SdkBody::from(body);
let url_list = self.directory.as_ref().expect("msg").list(invocation);
let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
let http_uri =
http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- let req = self.map_request(http_uri.clone(), path, body);
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
let mut conn = Connection::new().with_host(http_uri);
let response = conn
@@ -310,13 +319,14 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(en);
+ let sdk_body = SdkBody::from(body);
let url_list = self.directory.as_ref().expect("msg").list(invocation);
let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
let http_uri =
http::Uri::from_str(&format!("http://{}:{}/", real_url.ip,
real_url.port)).unwrap();
- let req = self.map_request(http_uri.clone(), path, body);
+ let req = self.map_request(http_uri.clone(), path, sdk_body);
let mut conn = Connection::new().with_host(http_uri);
let response = conn
diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs
index d7332b2..73c6da2 100644
--- a/examples/echo/src/echo/server.rs
+++ b/examples/echo/src/echo/server.rs
@@ -77,7 +77,7 @@ async fn main() {
// 3. 通过serverbuilder来初始化Server
let builder = ServerBuilder::new()
- .with_listener("unix".to_string())
+ .with_listener("tcp".to_string())
.with_service_names(vec!["grpc.examples.echo.Echo".to_string()])
.with_addr("127.0.0.1:8888");
builder.build().serve().await.unwrap();