This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit 6278851c14c61f8bab930b20338b6e859bd9b1e2
Author: yangyang <[email protected]>
AuthorDate: Fri Jul 29 00:19:09 2022 +0800

    refactor(triple): add router, support grpc compression, start multi server 
at the same time
---
 config/src/config.rs                         | 25 +++++++-
 config/src/service.rs                        | 11 ++--
 dubbo/src/echo/echo_client.rs                |  8 ++-
 dubbo/src/echo/echo_server.rs                |  4 +-
 dubbo/src/echo/mod.rs                        | 53 +++++++++++-----
 dubbo/src/helloworld/client.rs               |  4 +-
 dubbo/src/protocol/triple/mod.rs             |  2 -
 dubbo/src/protocol/triple/triple_protocol.rs |  5 +-
 dubbo/src/protocol/triple/triple_server.rs   |  3 +-
 triple/Cargo.toml                            |  1 +
 triple/src/client/grpc.rs                    | 29 +++++++--
 triple/src/lib.rs                            |  9 +++
 triple/src/main.rs                           | 36 -----------
 triple/src/server/compression.rs             | 82 ++++++++++++++++++++++++
 triple/src/server/consts.rs                  | 16 +++++
 triple/src/server/decode.rs                  | 53 ++++++++++++++--
 triple/src/server/encode.rs                  | 32 ++++++++--
 triple/src/server/mod.rs                     |  1 +
 triple/src/server/server.rs                  | 49 ++++++++++++++-
 triple/src/transport/mod.rs                  |  1 +
 triple/src/transport/router.rs               | 94 ++++++++++++++++++++++++++++
 triple/src/transport/service.rs              | 23 ++++---
 22 files changed, 439 insertions(+), 102 deletions(-)

diff --git a/config/src/config.rs b/config/src/config.rs
index 1adf7d3..e1ff02f 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -26,7 +26,8 @@ use super::service::ServiceConfig;
 #[derive(Debug, Default)]
 pub struct RootConfig {
     pub name: String,
-    pub service: ServiceConfig,
+    pub service: HashMap<String, ServiceConfig>,
+    pub protocols: HashMap<String, ProtocolConfig>,
     pub data: HashMap<String, Box<dyn any::Any>>,
 }
 
@@ -40,7 +41,8 @@ impl RootConfig {
     pub fn new() -> Self {
         Self {
             name: "dubbo".to_string(),
-            service: ServiceConfig::default(),
+            service: HashMap::new(),
+            protocols: HashMap::new(),
             data: HashMap::new(),
         }
     }
@@ -50,6 +52,7 @@ impl RootConfig {
             .group("test".to_string())
             .serializer("json".to_string())
             .version("1.0.0".to_string())
+            .protocol_names("triple".to_string())
             .name("echo".to_string());
 
         let triple_config = ProtocolConfig::default()
@@ -58,7 +61,23 @@ impl RootConfig {
             .port("8888".to_string());
 
         let service_config = 
service_config.add_protocol_configs(triple_config);
-        self.service = service_config;
+        self.service.insert("echo".to_string(), service_config);
+        self.service.insert(
+            "helloworld.Greeter".to_string(),
+            ServiceConfig::default()
+                .group("test".to_string())
+                .serializer("json".to_string())
+                .version("1.0.0".to_string())
+                .name("helloworld.Greeter".to_string())
+                .protocol_names("triple".to_string()),
+        );
+        self.protocols.insert(
+            "triple".to_string(),
+            ProtocolConfig::default()
+                .name("triple".to_string())
+                .ip("0.0.0.0".to_string())
+                .port("8889".to_string()),
+        );
         // 通过环境变量读取某个文件。加在到内存中
         self.data.insert(
             "dubbo.provider.url".to_string(),
diff --git a/config/src/service.rs b/config/src/service.rs
index aeda24b..146c041 100644
--- a/config/src/service.rs
+++ b/config/src/service.rs
@@ -24,8 +24,8 @@ pub struct ServiceConfig {
     pub version: String,
     pub group: String,
     pub name: String,
-    pub protocol_names: Vec<String>,
-    pub registry_names: Vec<String>,
+    pub protocol: String,
+    pub registry: String,
     pub serializer: String,
     pub protocol_configs: HashMap<String, ProtocolConfig>,
 }
@@ -43,11 +43,8 @@ impl ServiceConfig {
         Self { group, ..self }
     }
 
-    pub fn protocol_names(self, protocol_names: Vec<String>) -> Self {
-        Self {
-            protocol_names,
-            ..self
-        }
+    pub fn protocol_names(self, protocol: String) -> Self {
+        Self { protocol, ..self }
     }
 
     pub fn serializer(self, serializer: String) -> Self {
diff --git a/dubbo/src/echo/echo_client.rs b/dubbo/src/echo/echo_client.rs
index ebe7908..c89dad9 100644
--- a/dubbo/src/echo/echo_client.rs
+++ b/dubbo/src/echo/echo_client.rs
@@ -56,7 +56,7 @@ impl EchoClient {
             .bidi_streaming(
                 req,
                 codec,
-                http::uri::PathAndQuery::from_static("/bidi_stream"),
+                http::uri::PathAndQuery::from_static("/echo/bidi_stream"),
             )
             .await
     }
@@ -67,7 +67,11 @@ impl EchoClient {
     ) -> Result<Response<HelloReply>, tonic::Status> {
         let codec = SerdeCodec::<HelloRequest, HelloReply>::default();
         self.inner
-            .unary(req, codec, http::uri::PathAndQuery::from_static("/hello"))
+            .unary(
+                req,
+                codec,
+                http::uri::PathAndQuery::from_static("/echo/hello"),
+            )
             .await
     }
 }
diff --git a/dubbo/src/echo/echo_server.rs b/dubbo/src/echo/echo_server.rs
index 96d89f3..a2ff575 100644
--- a/dubbo/src/echo/echo_server.rs
+++ b/dubbo/src/echo/echo_server.rs
@@ -101,7 +101,7 @@ where
     fn call(&mut self, req: http::Request<B>) -> Self::Future {
         let inner = self.inner.clone();
         match req.uri().path() {
-            "/hello" => {
+            "/echo/hello" => {
                 struct UnaryServer<T> {
                     inner: _Inner<T>,
                 }
@@ -127,7 +127,7 @@ where
 
                 Box::pin(fut)
             }
-            "/bidi_stream" => {
+            "/echo/bidi_stream" => {
                 struct StreamingServer<T> {
                     inner: _Inner<T>,
                 }
diff --git a/dubbo/src/echo/mod.rs b/dubbo/src/echo/mod.rs
index 4df0678..7d4ff83 100644
--- a/dubbo/src/echo/mod.rs
+++ b/dubbo/src/echo/mod.rs
@@ -100,10 +100,7 @@ async fn test_server() {
     triple::transport::DubboServer::new()
         .add_service(name.clone(), esi)
         .with_http2_keepalive_timeout(Duration::from_secs(60))
-        .serve(
-            name.clone(),
-            "0.0.0.0:8888".to_socket_addrs().unwrap().next().unwrap(),
-        )
+        .serve("0.0.0.0:8888".to_socket_addrs().unwrap().next().unwrap())
         .await
         .unwrap();
     // server.add_service(esi.into());
@@ -112,10 +109,12 @@ async fn test_server() {
 #[tokio::test]
 async fn test_triple_protocol() {
     use crate::common::url::Url;
+    use crate::protocol::triple::triple_exporter::TripleExporter;
     use crate::protocol::triple::triple_protocol::TripleProtocol;
     use crate::protocol::Protocol;
     use config::get_global_config;
-    use futures::join;
+    use futures::future;
+    use futures::Future;
 
     let conf = get_global_config();
     let server_name = "echo".to_string();
@@ -130,23 +129,45 @@ async fn test_triple_protocol() {
         crate::protocol::triple::TRIPLE_SERVICES
             .read()
             .unwrap()
-            .len()
+            .len(),
     );
 
     let mut urls = Vec::<Url>::new();
-    for (_, proto_conf) in conf.service.protocol_configs.iter() {
-        println!("{:?}", proto_conf);
-        let u = Url {
-            url: proto_conf.to_owned().to_url().clone(),
-            service_key: server_name.clone(),
-        };
+    let pro = Box::new(TripleProtocol::new());
+    let mut async_vec: Vec<Pin<Box<dyn Future<Output = TripleExporter> + 
Send>>> = Vec::new();
+    for (_, c) in conf.service.iter() {
+        let mut u = Url::default();
+        if c.protocol_configs.is_empty() {
+            u = Url {
+                url: conf
+                    .protocols
+                    .get(&c.protocol)
+                    .unwrap()
+                    .clone()
+                    .to_url()
+                    .clone(),
+                service_key: c.name.clone(),
+            };
+        } else {
+            u = Url {
+                url: c
+                    .protocol_configs
+                    .get(&c.protocol)
+                    .unwrap()
+                    .clone()
+                    .to_url()
+                    .clone(),
+                service_key: c.name.clone(),
+            }
+        }
+        println!("url: {:?}", u);
         urls.push(u.clone());
 
-        println!("triple server running, url: 0.0.0.0:8888, {:?}", u);
-        let pro = TripleProtocol::new();
-        let tri_fut = pro.export(u.clone());
-        let _res = join!(tri_fut);
+        let tri_fut = pro.clone().export(u.clone());
+        async_vec.push(tri_fut);
     }
+
+    let _res = future::join_all(async_vec).await;
 }
 
 #[allow(dead_code)]
diff --git a/dubbo/src/helloworld/client.rs b/dubbo/src/helloworld/client.rs
index 5fd571d..078c665 100644
--- a/dubbo/src/helloworld/client.rs
+++ b/dubbo/src/helloworld/client.rs
@@ -26,13 +26,13 @@ use dubbo::helloworld::helloworld::HelloRequest;
 
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    let mut client = GreeterClient::connect("http://[::1]:50051";).await?;
+    let client = GreeterClient::connect("http://127.0.0.1:8889";).await?;
 
     let request = tonic::Request::new(HelloRequest {
         name: "Tonic".into(),
     });
 
-    let response = client.say_hello(request).await?;
+    let response = client.send_gzip().accept_gzip().say_hello(request).await?;
 
     println!("RESPONSE={:?}", response);
 
diff --git a/dubbo/src/protocol/triple/mod.rs b/dubbo/src/protocol/triple/mod.rs
index 0e84907..fed8bba 100644
--- a/dubbo/src/protocol/triple/mod.rs
+++ b/dubbo/src/protocol/triple/mod.rs
@@ -31,8 +31,6 @@ pub type GrpcBoxCloneService =
     BoxCloneService<http::Request<hyper::Body>, http::Response<BoxBody>, 
std::convert::Infallible>;
 
 lazy_static! {
-    // pub static ref DUBBO_GRPC_SERVICES: RwLock<HashMap<String, Box<dyn 
DubboGrpcService<GrpcInvoker> + Send + Sync + 'static>>> =
-    //     RwLock::new(HashMap::new());
     pub static ref TRIPLE_SERVICES: RwLock<HashMap<String, 
GrpcBoxCloneService>> =
         RwLock::new(HashMap::new());
 }
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs 
b/dubbo/src/protocol/triple/triple_protocol.rs
index ba517bb..0aa3537 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+use std::boxed::Box;
 use std::collections::HashMap;
 
 use async_trait::async_trait;
@@ -25,6 +26,7 @@ use super::triple_server::TripleServer;
 use crate::common::url::Url;
 use crate::protocol::Protocol;
 
+#[derive(Clone)]
 pub struct TripleProtocol {
     servers: HashMap<String, TripleServer>,
 }
@@ -57,8 +59,9 @@ impl Protocol for TripleProtocol {
         todo!()
     }
 
-    async fn export(self, url: Url) -> Self::Exporter {
+    async fn export(mut self, url: Url) -> Self::Exporter {
         let server = TripleServer::new(url.service_key.clone());
+        self.servers.insert(url.service_key.clone(), server.clone());
         server.serve(url.url.clone()).await;
 
         TripleExporter::new()
diff --git a/dubbo/src/protocol/triple/triple_server.rs 
b/dubbo/src/protocol/triple/triple_server.rs
index e49694b..207f46b 100644
--- a/dubbo/src/protocol/triple/triple_server.rs
+++ b/dubbo/src/protocol/triple/triple_server.rs
@@ -29,7 +29,7 @@ impl TripleServer {
     pub fn new(name: String) -> TripleServer {
         Self {
             name,
-            ..Default::default()
+            s: DubboServer::new(),
         }
     }
 
@@ -46,7 +46,6 @@ impl TripleServer {
 
         server
             .serve(
-                self.name,
                 uri.authority()
                     .unwrap()
                     .to_string()
diff --git a/triple/Cargo.toml b/triple/Cargo.toml
index df17abd..34962d7 100644
--- a/triple/Cargo.toml
+++ b/triple/Cargo.toml
@@ -29,6 +29,7 @@ serde_json = "1.0.82"
 serde = {version="1.0.138", features = ["derive"]}
 async-stream = "0.3"
 tokio-stream = "0.1"
+flate2 = "1.0"
 
 # for test in codegen
 config = {path = "../config"}
\ No newline at end of file
diff --git a/triple/src/client/grpc.rs b/triple/src/client/grpc.rs
index 362a0f2..32ebb8d 100644
--- a/triple/src/client/grpc.rs
+++ b/triple/src/client/grpc.rs
@@ -22,6 +22,7 @@ use http::HeaderValue;
 
 use crate::codec::Codec;
 use crate::invocation::{IntoStreamingRequest, Request, Response};
+use crate::server::consts::CompressionEncoding;
 use crate::server::encode::encode;
 use crate::server::Streaming;
 
@@ -29,6 +30,7 @@ use crate::server::Streaming;
 pub struct TripleClient {
     host: Option<http::Uri>,
     inner: ConnectionPool,
+    send_compression_encoding: Option<CompressionEncoding>,
 }
 
 #[derive(Debug, Default, Clone)]
@@ -54,6 +56,7 @@ impl TripleClient {
         TripleClient {
             host: None,
             inner: ConnectionPool::new(),
+            send_compression_encoding: Some(CompressionEncoding::Gzip),
         }
     }
 
@@ -115,6 +118,10 @@ impl TripleClient {
             "tri-unit-info",
             HeaderValue::from_static("dubbo-rust/1.0.0"),
         );
+        if let Some(_encoding) = self.send_compression_encoding {
+            req.headers_mut()
+                .insert("grpc-encoding", 
http::HeaderValue::from_static("gzip"));
+        }
 
         // const (
         //     TripleContentType    = "application/grpc+proto"
@@ -143,7 +150,12 @@ impl TripleClient {
         M2: Send + Sync + 'static,
     {
         let req = req.map(|m| stream::once(future::ready(m)));
-        let body_stream = encode(codec.encoder(), 
req.into_inner().map(Ok)).into_stream();
+        let body_stream = encode(
+            codec.encoder(),
+            req.into_inner().map(Ok),
+            self.send_compression_encoding,
+        )
+        .into_stream();
         let body = hyper::Body::wrap_stream(body_stream);
 
         let req = self.map_request(path, body);
@@ -152,7 +164,9 @@ impl TripleClient {
 
         match response {
             Ok(v) => {
-                let resp = v.map(|body| Streaming::new(body, codec.decoder()));
+                let resp = v.map(|body| {
+                    Streaming::new(body, codec.decoder(), 
self.send_compression_encoding)
+                });
                 let (mut parts, body) = Response::from_http(resp).into_parts();
 
                 futures_util::pin_mut!(body);
@@ -185,7 +199,12 @@ impl TripleClient {
         M2: Send + Sync + 'static,
     {
         let req = req.into_streaming_request();
-        let en = encode(codec.encoder(), 
req.into_inner().map(Ok)).into_stream();
+        let en = encode(
+            codec.encoder(),
+            req.into_inner().map(Ok),
+            self.send_compression_encoding,
+        )
+        .into_stream();
         let body = hyper::Body::wrap_stream(en);
 
         let req = self.map_request(path, body);
@@ -195,7 +214,9 @@ impl TripleClient {
 
         match response {
             Ok(v) => {
-                let resp = v.map(|body| Streaming::new(body, codec.decoder()));
+                let resp = v.map(|body| {
+                    Streaming::new(body, codec.decoder(), 
self.send_compression_encoding)
+                });
 
                 Ok(Response::from_http(resp))
             }
diff --git a/triple/src/lib.rs b/triple/src/lib.rs
index 42a3c6d..ddb61a5 100644
--- a/triple/src/lib.rs
+++ b/triple/src/lib.rs
@@ -32,3 +32,12 @@ pub fn empty_body() -> BoxBody {
         .map_err(|err| match err {})
         .boxed_unsync()
 }
+
+pub(crate) fn boxed<B>(body: B) -> BoxBody
+where
+    B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
+    B::Error: Into<crate::Error>,
+{
+    body.map_err(|err| tonic::Status::new(tonic::Code::Internal, 
format!("{:?}", err.into())))
+        .boxed_unsync()
+}
diff --git a/triple/src/main.rs b/triple/src/main.rs
deleted file mode 100644
index 4350696..0000000
--- a/triple/src/main.rs
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-pub mod client;
-pub mod codec;
-pub mod invocation;
-pub mod server;
-pub mod transport;
-
-use http_body::Body;
-
-pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
-pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, 
tonic::Status>;
-pub fn empty_body() -> BoxBody {
-    http_body::Empty::new()
-        .map_err(|err| match err {})
-        .boxed_unsync()
-}
-
-fn main() {
-    println!("hello, triple");
-}
diff --git a/triple/src/server/compression.rs b/triple/src/server/compression.rs
new file mode 100644
index 0000000..77f66a9
--- /dev/null
+++ b/triple/src/server/compression.rs
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use bytes::{Buf, BufMut, BytesMut};
+use flate2::read::{GzDecoder, GzEncoder};
+use flate2::Compression;
+
+use super::consts::CompressionEncoding;
+
+pub fn compress(
+    encoding: CompressionEncoding,
+    src: &mut BytesMut,
+    dst: &mut BytesMut,
+    len: usize,
+) -> Result<(), std::io::Error> {
+    dst.reserve(len);
+
+    match encoding {
+        CompressionEncoding::Gzip => {
+            let mut en = GzEncoder::new(src.reader(), Compression::default());
+
+            let mut dst_writer = dst.writer();
+
+            std::io::copy(&mut en, &mut dst_writer)?;
+        }
+    }
+
+    Ok(())
+}
+
+pub fn decompress(
+    encoding: CompressionEncoding,
+    src: &mut BytesMut,
+    dst: &mut BytesMut,
+    len: usize,
+) -> Result<(), std::io::Error> {
+    let capacity = len * 2;
+    dst.reserve(capacity);
+
+    match encoding {
+        CompressionEncoding::Gzip => {
+            let mut de = GzDecoder::new(src.reader());
+
+            let mut dst_writer = dst.writer();
+
+            std::io::copy(&mut de, &mut dst_writer)?;
+        }
+    }
+    Ok(())
+}
+
+#[test]
+fn test_compress() {
+    let mut src = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
+    src.put(&b"test compress"[..]);
+    let mut dst = BytesMut::new();
+    let len = src.len();
+    src.reserve(len);
+
+    compress(CompressionEncoding::Gzip, &mut src, &mut dst, len);
+    println!("src: {:?}, dst: {:?}", src, dst);
+
+    let mut de_dst = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
+    let de_len = dst.len();
+    decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, de_len);
+
+    println!("src: {:?}, dst: {:?}", dst, de_dst);
+}
diff --git a/triple/src/server/consts.rs b/triple/src/server/consts.rs
index 4898211..0e0f0fa 100644
--- a/triple/src/server/consts.rs
+++ b/triple/src/server/consts.rs
@@ -22,3 +22,19 @@ pub const HEADER_SIZE: usize =
     std::mem::size_of::<u8>() +
     // data length
     std::mem::size_of::<u32>();
+
+#[derive(Debug, Clone, Copy)]
+pub enum CompressionEncoding {
+    Gzip,
+}
+
+use lazy_static::lazy_static;
+use std::collections::HashMap;
+
+lazy_static! {
+    pub static ref COMPRESSIONS: HashMap<String, Option<CompressionEncoding>> 
= {
+        let mut v = HashMap::new();
+        v.insert("gzip".to_string(), Some(CompressionEncoding::Gzip));
+        v
+    };
+}
diff --git a/triple/src/server/decode.rs b/triple/src/server/decode.rs
index 3c62aeb..321a4e6 100644
--- a/triple/src/server/decode.rs
+++ b/triple/src/server/decode.rs
@@ -23,6 +23,8 @@ use futures_util::{future, ready};
 use http_body::Body;
 use tonic::metadata::MetadataMap;
 
+use super::compression::decompress;
+use super::consts::CompressionEncoding;
 use crate::codec::{DecodeBuf, Decoder};
 
 type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, tonic::Status>;
@@ -33,17 +35,19 @@ pub struct Streaming<T> {
     decoder: Box<dyn Decoder<Item = T, Error = tonic::Status> + Send + 
'static>,
     buf: BytesMut,
     trailers: Option<MetadataMap>,
+    compress: Option<CompressionEncoding>,
+    decompress_buf: BytesMut,
 }
 
 #[derive(PartialEq)]
 enum State {
     ReadHeader,
-    ReadBody { len: usize },
+    ReadBody { len: usize, is_compressed: bool },
     Error,
 }
 
 impl<T> Streaming<T> {
-    pub fn new<B, D>(body: B, decoder: D) -> Self
+    pub fn new<B, D>(body: B, decoder: D, compress: 
Option<CompressionEncoding>) -> Self
     where
         B: Body + Send + 'static,
         B::Error: Into<crate::Error>,
@@ -58,6 +62,8 @@ impl<T> Streaming<T> {
             decoder: Box::new(decoder),
             buf: BytesMut::with_capacity(super::consts::BUFFER_SIZE),
             trailers: None,
+            compress,
+            decompress_buf: BytesMut::new(),
         }
     }
 
@@ -86,19 +92,54 @@ impl<T> Streaming<T> {
                 return Ok(None);
             }
 
-            let _is_compressed = self.buf.get_u8();
+            let is_compressed = match self.buf.get_u8() {
+                0 => false,
+                1 => {
+                    if self.compress.is_some() {
+                        true
+                    } else {
+                        return Err(tonic::Status::internal(
+                            "set compression flag, but no grpc-encoding 
specified",
+                        ));
+                    }
+                }
+                v => {
+                    return Err(tonic::Status::internal(format!(
+                        "receive message with compression flag{}, flag should 
be 0 or 1",
+                        v
+                    )))
+                }
+            };
             let len = self.buf.get_u32() as usize;
             self.buf.reserve(len as usize);
 
-            self.state = State::ReadBody { len }
+            self.state = State::ReadBody { len, is_compressed }
         }
 
-        if let State::ReadBody { len } = self.state {
+        if let State::ReadBody { len, is_compressed } = self.state {
             if self.buf.remaining() < len || self.buf.len() < len {
                 return Ok(None);
             }
 
-            let decoding_result = self.decoder.decode(&mut DecodeBuf::new(&mut 
self.buf, len));
+            let decoding_result = if is_compressed {
+                self.decompress_buf.clear();
+                if let Err(err) = decompress(
+                    self.compress.unwrap(),
+                    &mut self.buf,
+                    &mut self.decompress_buf,
+                    len,
+                ) {
+                    return Err(tonic::Status::internal(err.to_string()));
+                }
+
+                let decompress_len = self.decompress_buf.len();
+                self.decoder.decode(&mut DecodeBuf::new(
+                    &mut self.decompress_buf,
+                    decompress_len,
+                ))
+            } else {
+                self.decoder.decode(&mut DecodeBuf::new(&mut self.buf, len))
+            };
 
             return match decoding_result {
                 Ok(Some(r)) => {
diff --git a/triple/src/server/encode.rs b/triple/src/server/encode.rs
index 9e4ebc1..dfa4ae7 100644
--- a/triple/src/server/encode.rs
+++ b/triple/src/server/encode.rs
@@ -23,10 +23,16 @@ use http_body::Body;
 use pin_project::pin_project;
 use tonic::Status;
 
+use super::compression::compress;
+use super::consts::CompressionEncoding;
 use crate::codec::{EncodeBuf, Encoder};
 
 #[allow(unused_must_use)]
-pub fn encode<E, B>(mut encoder: E, resp_body: B) -> impl TryStream<Ok = 
Bytes, Error = Status>
+pub fn encode<E, B>(
+    mut encoder: E,
+    resp_body: B,
+    compression_encoding: Option<CompressionEncoding>,
+) -> impl TryStream<Ok = Bytes, Error = Status>
 where
     E: Encoder<Error = Status>,
     B: Stream<Item = Result<E::Item, Status>>,
@@ -35,6 +41,11 @@ where
         let mut buf = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
         futures_util::pin_mut!(resp_body);
 
+        let (enable_compress, mut uncompression_buf) = match 
compression_encoding {
+            Some(CompressionEncoding::Gzip) => (true, 
BytesMut::with_capacity(super::consts::BUFFER_SIZE)),
+            None => (false, BytesMut::new())
+        };
+
         loop {
             match resp_body.next().await {
                 Some(Ok(item)) => {
@@ -43,7 +54,18 @@ where
                     unsafe {
                         buf.advance_mut(super::consts::HEADER_SIZE);
                     }
-                    encoder.encode(item, &mut EncodeBuf::new(&mut 
buf)).map_err(|_e| tonic::Status::internal("encode error"));
+
+                    if enable_compress {
+                        uncompression_buf.clear();
+
+                        encoder.encode(item, &mut EncodeBuf::new(&mut 
uncompression_buf)).map_err(|_e| tonic::Status::internal("encode error"));
+
+                        let len = uncompression_buf.len();
+                        compress(compression_encoding.unwrap(), &mut 
uncompression_buf, &mut buf, len).map_err(|_| tonic::Status::internal("compress 
error"));
+                    } else {
+                        encoder.encode(item, &mut EncodeBuf::new(&mut 
buf)).map_err(|_e| tonic::Status::internal("encode error"));
+                    }
+
 
                     let len = buf.len() - super::consts::HEADER_SIZE;
                     {
@@ -64,24 +86,26 @@ where
 pub fn encode_server<E, B>(
     encoder: E,
     body: B,
+    compression_encoding: Option<CompressionEncoding>,
 ) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
 where
     E: Encoder<Error = Status>,
     B: Stream<Item = Result<E::Item, Status>>,
 {
-    let s = encode(encoder, body).into_stream();
+    let s = encode(encoder, body, compression_encoding).into_stream();
     EncodeBody::new_server(s)
 }
 
 pub fn encode_client<E, B>(
     encoder: E,
     body: B,
+    compression_encoding: Option<CompressionEncoding>,
 ) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
 where
     E: Encoder<Error = Status>,
     B: Stream<Item = E::Item>,
 {
-    let s = encode(encoder, body.map(Ok)).into_stream();
+    let s = encode(encoder, body.map(Ok), compression_encoding).into_stream();
     EncodeBody::new_client(s)
 }
 
diff --git a/triple/src/server/mod.rs b/triple/src/server/mod.rs
index 8be7239..6fd5d45 100644
--- a/triple/src/server/mod.rs
+++ b/triple/src/server/mod.rs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+pub mod compression;
 pub mod consts;
 pub mod decode;
 pub mod encode;
diff --git a/triple/src/server/server.rs b/triple/src/server/server.rs
index 859f545..4512ddb 100644
--- a/triple/src/server/server.rs
+++ b/triple/src/server/server.rs
@@ -26,6 +26,9 @@ use crate::server::Streaming;
 use crate::BoxBody;
 use config::BusinessConfig;
 
+pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding";
+pub const GRPC_ENCODING: &str = "grpc-encoding";
+
 pub struct TripleServer<T> {
     codec: T,
 }
@@ -51,12 +54,30 @@ where
         B: Body + Send + 'static,
         B::Error: Into<crate::Error> + Send,
     {
-        let req_stream = req.map(|body| Streaming::new(body, 
self.codec.decoder()));
+        let encoding = 
req.headers().get(GRPC_ENCODING).unwrap().to_str().unwrap();
+        let compression = match super::consts::COMPRESSIONS.get(encoding) {
+            Some(val) => val.to_owned(),
+            None => {
+                let mut status = tonic::Status::unimplemented(format!(
+                    "grpc-accept-encoding: {} not support!",
+                    encoding
+                ));
+
+                status.metadata_mut().insert(
+                    GRPC_ACCEPT_ENCODING,
+                    
tonic::metadata::MetadataValue::from_static("gzip,identity"),
+                );
+
+                return status.to_http();
+            }
+        };
+
+        let req_stream = req.map(|body| Streaming::new(body, 
self.codec.decoder(), compression));
 
         let resp = service.call(Request::from_http(req_stream)).await;
 
         let (mut parts, resp_body) = resp.unwrap().into_http().into_parts();
-        let resp_body = encode_server(self.codec.encoder(), resp_body);
+        let resp_body = encode_server(self.codec.encoder(), resp_body, 
compression);
 
         parts.headers.insert(
             http::header::CONTENT_TYPE,
@@ -76,7 +97,25 @@ where
         B: Body + Send + 'static,
         B::Error: Into<crate::Error> + Send,
     {
-        let req_stream = req.map(|body| Streaming::new(body, 
self.codec.decoder()));
+        let encoding = 
req.headers().get(GRPC_ENCODING).unwrap().to_str().unwrap();
+        let compression = match super::consts::COMPRESSIONS.get(encoding) {
+            Some(val) => val.to_owned(),
+            None => {
+                let mut status = tonic::Status::unimplemented(format!(
+                    "grpc-accept-encoding: {} not support!",
+                    encoding
+                ));
+
+                status.metadata_mut().insert(
+                    GRPC_ACCEPT_ENCODING,
+                    
tonic::metadata::MetadataValue::from_static("gzip,identity"),
+                );
+
+                return status.to_http();
+            }
+        };
+
+        let req_stream = req.map(|body| Streaming::new(body, 
self.codec.decoder(), compression));
         let (parts, mut body) = Request::from_http(req_stream).into_parts();
         let msg = body
             .try_next()
@@ -90,12 +129,16 @@ where
         let resp_body = encode_server(
             self.codec.encoder(),
             stream::once(future::ready(resp_body)).map(Ok).into_stream(),
+            compression,
         );
 
         parts.headers.insert(
             http::header::CONTENT_TYPE,
             http::HeaderValue::from_static("application/grpc"),
         );
+        parts
+            .headers
+            .insert(GRPC_ENCODING, http::HeaderValue::from_static("gzip"));
         parts.status = http::StatusCode::OK;
         http::Response::from_parts(parts, BoxBody::new(resp_body))
     }
diff --git a/triple/src/transport/mod.rs b/triple/src/transport/mod.rs
index 1f772a6..296af45 100644
--- a/triple/src/transport/mod.rs
+++ b/triple/src/transport/mod.rs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+pub mod router;
 pub mod service;
 
 pub use service::DubboServer;
diff --git a/triple/src/transport/router.rs b/triple/src/transport/router.rs
new file mode 100644
index 0000000..a64306b
--- /dev/null
+++ b/triple/src/transport/router.rs
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::fmt;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+
+use axum::Router;
+use futures_core::Future;
+use hyper::{Body, Request, Response};
+use pin_project::pin_project;
+use tower::ServiceExt;
+use tower_service::Service;
+
+use crate::BoxBody;
+
+#[derive(Debug, Clone, Default)]
+pub struct DubboRouter {
+    pub router: Router,
+}
+
+impl DubboRouter {
+    pub fn new() -> DubboRouter {
+        Self {
+            router: Router::new(),
+        }
+    }
+}
+
+impl DubboRouter {
+    pub fn add_service<S>(mut self, name: String, service: S) -> Self
+    where
+        S: Service<Request<Body>, Response = Response<BoxBody>, Error = 
std::convert::Infallible>
+            + Clone
+            + Send
+            + 'static,
+        S::Future: Send + 'static,
+    {
+        let svc = service.map_response(|res| res.map(axum::body::boxed));
+        // *{bubbo} represents wildcard router
+        self.router = self.router.route(&format!("/{}/*dubbo", name), svc);
+
+        self
+    }
+}
+
+impl Service<Request<Body>> for DubboRouter {
+    type Response = Response<BoxBody>;
+    type Error = crate::Error;
+    type Future = RoutesFuture;
+
+    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), 
Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, req: Request<Body>) -> Self::Future {
+        RoutesFuture(self.router.call(req))
+    }
+}
+
+#[pin_project]
+pub struct RoutesFuture(#[pin] axum::routing::future::RouteFuture<Body, 
std::convert::Infallible>);
+
+impl fmt::Debug for RoutesFuture {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_tuple("RoutesFuture").finish()
+    }
+}
+
+impl Future for RoutesFuture {
+    type Output = Result<Response<BoxBody>, crate::Error>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        match futures_util::ready!(self.project().0.poll(cx)) {
+            Ok(res) => Ok(res.map(crate::boxed)).into(),
+            Err(err) => match err {},
+        }
+    }
+}
diff --git a/triple/src/transport/service.rs b/triple/src/transport/service.rs
index fdd6ecb..c8a5b67 100644
--- a/triple/src/transport/service.rs
+++ b/triple/src/transport/service.rs
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-use std::collections::HashMap;
 use std::future;
 use std::net::SocketAddr;
 use std::task::Poll;
@@ -26,12 +25,12 @@ use tokio::time::Duration;
 use tower::ServiceExt;
 use tower_service::Service;
 
+use super::router::DubboRouter;
 use crate::BoxBody;
 use config::BusinessConfig;
 use config::Config;
 
 type BoxService = tower::util::BoxService<Request<Body>, Response<BoxBody>, 
crate::Error>;
-type BoxCloneService = tower::util::BoxCloneService<Request<Body>, 
Response<BoxBody>, crate::Error>;
 
 #[derive(Default, Clone)]
 pub struct DubboServer {
@@ -42,7 +41,7 @@ pub struct DubboServer {
     max_frame_size: Option<u32>,
     http2_keepalive_interval: Option<Duration>,
     http2_keepalive_timeout: Option<Duration>,
-    services: HashMap<String, BoxCloneService>,
+    router: DubboRouter,
 }
 
 impl DubboServer {
@@ -103,7 +102,7 @@ impl DubboServer {
             http2_keepalive_interval: None,
             http2_keepalive_timeout: None,
             max_frame_size: None,
-            services: HashMap::new(),
+            router: DubboRouter::new(),
         }
     }
 }
@@ -111,19 +110,19 @@ impl DubboServer {
 impl DubboServer {
     pub fn add_service<S>(mut self, name: String, service: S) -> Self
     where
-        S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send 
+ 'static,
+        S: Service<Request<Body>, Response = Response<BoxBody>, Error = 
std::convert::Infallible>
+            + Clone
+            + Send
+            + 'static,
         S::Future: Send + 'static,
         S::Error: Into<crate::Error> + Send + 'static,
     {
-        self.services
-            .insert(name, service.map_err(|err| err.into()).boxed_clone());
-        Self { ..self }
+        self.router = self.router.add_service(name, service);
+        self
     }
 
-    pub async fn serve(self, name: String, addr: SocketAddr) -> Result<(), 
crate::Error> {
-        let svc = MakeSvc {
-            inner: self.services.get(&name).unwrap().clone(),
-        };
+    pub async fn serve(self, addr: SocketAddr) -> Result<(), crate::Error> {
+        let svc = MakeSvc { inner: self.router };
 
         let http2_keepalive_timeout = self
             .http2_keepalive_timeout

Reply via email to