This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 2445333  Ftr: clone http body to support FailoverCluster (#112)
2445333 is described below

commit 2445333aa6116b5083087bce367be91807a8e6a6
Author: Yang Yang <[email protected]>
AuthorDate: Thu Feb 16 21:48:44 2023 +0800

    Ftr: clone http body to support FailoverCluster (#112)
    
    * Rft(dubbo/client): use sdkbody to clone http body
    
    * feat(cluster): failover impl for unary api
    
    * style: cargo fmt
---
 dubbo/Cargo.toml                            |  1 +
 dubbo/src/cluster/mod.rs                    | 84 +++++++++++++++++++++++++++++
 dubbo/src/protocol/mod.rs                   |  3 +-
 dubbo/src/protocol/triple/triple_invoker.rs | 29 ++++++----
 dubbo/src/triple/client/builder.rs          |  4 +-
 dubbo/src/triple/client/triple.rs           | 28 ++++++----
 examples/echo/src/echo/server.rs            |  2 +-
 7 files changed, 129 insertions(+), 22 deletions(-)

diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 6a51730..e319259 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -33,6 +33,7 @@ tracing-subscriber = "0.3.15"
 axum = "0.5.9"
 async-stream = "0.3"
 flate2 = "1.0"
+aws-smithy-http = "0.54.1"
 
 dubbo-config = {path = "../config", version = "0.2.0"}
 
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 84980ea..2221afc 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -15,4 +15,88 @@
  * limitations under the License.
  */
 
+use std::{sync::Arc, task::Poll};
+
+use aws_smithy_http::body::SdkBody;
+use tower_service::Service;
+
+use crate::{
+    common::url::Url,
+    empty_body,
+    protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker},
+};
+
 pub mod directory;
+
+pub trait Directory {
+    fn list(&self, meta: String) -> Vec<BoxInvoker>;
+    fn is_empty(&self) -> bool;
+}
+
+type BoxDirectory = Box<dyn Directory>;
+
+pub struct FailoverCluster {
+    dir: Arc<BoxDirectory>,
+}
+
+impl FailoverCluster {
+    pub fn new(dir: BoxDirectory) -> FailoverCluster {
+        Self { dir: Arc::new(dir) }
+    }
+}
+
+impl Service<http::Request<SdkBody>> for FailoverCluster {
+    type Response = http::Response<crate::BoxBody>;
+
+    type Error = crate::Error;
+
+    type Future = crate::BoxFuture<Self::Response, Self::Error>;
+
+    fn poll_ready(
+        &mut self,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), Self::Error>> {
+        // if self.dir.is_empty() return err
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
+        println!("req: {}", req.body().content_length().unwrap());
+        let clone_body = req.body().try_clone().unwrap();
+        let mut clone_req = http::Request::builder()
+            .uri(req.uri().clone())
+            .method(req.method().clone());
+        *clone_req.headers_mut().unwrap() = req.headers().clone();
+        let r = clone_req.body(clone_body).unwrap();
+        let invokers = self.dir.list("service_name".to_string());
+        for mut invoker in invokers {
+            let fut = async move {
+                let res = invoker.call(r).await;
+                return res;
+            };
+            return Box::pin(fut);
+        }
+        Box::pin(async move {
+            Ok(http::Response::builder()
+                .status(200)
+                .header("grpc-status", "12")
+                .header("content-type", "application/grpc")
+                .body(empty_body())
+                .unwrap())
+        })
+    }
+}
+
+pub struct MockDirectory {}
+
+impl Directory for MockDirectory {
+    fn list(&self, meta: String) -> Vec<BoxInvoker> {
+        tracing::info!("MockDirectory: {}", meta);
+        let u = 
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
+        vec![Box::new(TripleInvoker::new(u))]
+    }
+
+    fn is_empty(&self) -> bool {
+        false
+    }
+}
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 9505dca..656da2a 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -24,6 +24,7 @@ use std::{
 };
 
 use async_trait::async_trait;
+use aws_smithy_http::body::SdkBody;
 use tower_service::Service;
 
 use crate::common::url::Url;
@@ -58,7 +59,7 @@ pub trait Invoker<ReqBody> {
 pub type BoxExporter = Box<dyn Exporter + Send + Sync>;
 pub type BoxInvoker = Box<
     dyn Invoker<
-            http::Request<hyper::Body>,
+            http::Request<SdkBody>,
             Response = http::Response<crate::BoxBody>,
             Error = crate::Error,
             Future = crate::BoxFuture<http::Response<crate::BoxBody>, 
crate::Error>,
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs 
b/dubbo/src/protocol/triple/triple_invoker.rs
index 4c411d1..df4d2ae 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -15,9 +15,17 @@
  * limitations under the License.
  */
 
+use std::str::FromStr;
+
+use aws_smithy_http::body::SdkBody;
 use tower_service::Service;
 
-use crate::{common::url::Url, protocol::Invoker, 
triple::client::builder::ClientBoxService};
+use crate::{
+    common::url::Url,
+    protocol::Invoker,
+    triple::{client::builder::ClientBoxService, 
transport::connection::Connection},
+    utils::boxed::BoxService,
+};
 
 pub struct TripleInvoker {
     url: Url,
@@ -25,16 +33,17 @@ pub struct TripleInvoker {
 }
 
 impl TripleInvoker {
-    // pub fn new(url: Url) -> TripleInvoker {
-    //     let uri = http::Uri::from_str(&url.to_url()).unwrap();
-    //     Self {
-    //         url,
-    //         conn: ClientBuilder::from_uri(&uri).build()connect(),
-    //     }
-    // }
+    pub fn new(url: Url) -> TripleInvoker {
+        let uri = http::Uri::from_str(&url.to_url()).unwrap();
+        let conn = Connection::new().with_host(uri);
+        Self {
+            url,
+            conn: BoxService::new(conn),
+        }
+    }
 }
 
-impl Invoker<http::Request<hyper::Body>> for TripleInvoker {
+impl Invoker<http::Request<SdkBody>> for TripleInvoker {
     type Response = http::Response<crate::BoxBody>;
 
     type Error = crate::Error;
@@ -45,7 +54,7 @@ impl Invoker<http::Request<hyper::Body>> for TripleInvoker {
         self.url.clone()
     }
 
-    fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
+    fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
         self.conn.call(req)
     }
 
diff --git a/dubbo/src/triple/client/builder.rs 
b/dubbo/src/triple/client/builder.rs
index e19eb94..4746249 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -20,10 +20,12 @@ use crate::{
     triple::compression::CompressionEncoding, utils::boxed::BoxService,
 };
 
+use aws_smithy_http::body::SdkBody;
+
 use super::TripleClient;
 
 pub type ClientBoxService =
-    BoxService<http::Request<hyper::Body>, http::Response<crate::BoxBody>, 
crate::Error>;
+    BoxService<http::Request<SdkBody>, http::Response<crate::BoxBody>, 
crate::Error>;
 
 #[derive(Clone, Debug, Default)]
 pub struct ClientBuilder {
diff --git a/dubbo/src/triple/client/triple.rs 
b/dubbo/src/triple/client/triple.rs
index 3c6172d..454e953 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -19,12 +19,16 @@ use std::str::FromStr;
 
 use futures_util::{future, stream, StreamExt, TryStreamExt};
 
+use aws_smithy_http::body::SdkBody;
 use http::HeaderValue;
 use rand::prelude::SliceRandom;
 use tower_service::Service;
 
 use super::{super::transport::connection::Connection, builder::ClientBuilder};
-use crate::codegen::{Directory, RpcInvocation};
+use crate::{
+    cluster::{FailoverCluster, MockDirectory},
+    codegen::{Directory, RpcInvocation},
+};
 
 use crate::{
     invocation::{IntoStreamingRequest, Metadata, Request, Response},
@@ -52,8 +56,8 @@ impl TripleClient {
         &self,
         uri: http::Uri,
         path: http::uri::PathAndQuery,
-        body: hyper::Body,
-    ) -> http::Request<hyper::Body> {
+        body: SdkBody,
+    ) -> http::Request<SdkBody> {
         let mut parts = uri.into_parts();
         parts.path_and_query = Some(path);
 
@@ -140,16 +144,19 @@ impl TripleClient {
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
+        let bytes = hyper::body::to_bytes(body).await.unwrap();
+        let sdk_body = SdkBody::from(bytes);
 
         let url_list = self.directory.as_ref().expect("msg").list(invocation);
         let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
         let http_uri =
             http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
 
-        let req = self.map_request(http_uri.clone(), path, body);
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
 
-        let mut conn = Connection::new().with_host(http_uri);
-        let response = conn
+        // let mut conn = Connection::new().with_host(http_uri);
+        let mut cluster = FailoverCluster::new(Box::new(MockDirectory {}));
+        let response = cluster
             .call(req)
             .await
             .map_err(|err| crate::status::Status::from_error(err.into()));
@@ -202,13 +209,14 @@ impl TripleClient {
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
+        let sdk_body = SdkBody::from(body);
 
         let url_list = self.directory.as_ref().expect("msg").list(invocation);
         let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
         let http_uri =
             http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
 
-        let req = self.map_request(http_uri.clone(), path, body);
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
 
         let mut conn = Connection::new().with_host(http_uri);
         let response = conn
@@ -248,13 +256,14 @@ impl TripleClient {
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
+        let sdk_body = SdkBody::from(body);
 
         let url_list = self.directory.as_ref().expect("msg").list(invocation);
         let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
         let http_uri =
             http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
 
-        let req = self.map_request(http_uri.clone(), path, body);
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
 
         let mut conn = Connection::new().with_host(http_uri);
         let response = conn
@@ -310,13 +319,14 @@ impl TripleClient {
         )
         .into_stream();
         let body = hyper::Body::wrap_stream(en);
+        let sdk_body = SdkBody::from(body);
 
         let url_list = self.directory.as_ref().expect("msg").list(invocation);
         let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
         let http_uri =
             http::Uri::from_str(&format!("http://{}:{}/";, real_url.ip, 
real_url.port)).unwrap();
 
-        let req = self.map_request(http_uri.clone(), path, body);
+        let req = self.map_request(http_uri.clone(), path, sdk_body);
 
         let mut conn = Connection::new().with_host(http_uri);
         let response = conn
diff --git a/examples/echo/src/echo/server.rs b/examples/echo/src/echo/server.rs
index d7332b2..73c6da2 100644
--- a/examples/echo/src/echo/server.rs
+++ b/examples/echo/src/echo/server.rs
@@ -77,7 +77,7 @@ async fn main() {
 
     // 3. 通过serverbuilder来初始化Server
     let builder = ServerBuilder::new()
-        .with_listener("unix".to_string())
+        .with_listener("tcp".to_string())
         .with_service_names(vec!["grpc.examples.echo.Echo".to_string()])
         .with_addr("127.0.0.1:8888");
     builder.build().serve().await.unwrap();

Reply via email to