This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit b6e165f1ef744e204f78427132ee5f069f2b9506
Author: yangyang <[email protected]>
AuthorDate: Thu Jul 21 21:36:44 2022 +0800

    refactor(triple): add prost codec, start server by config
---
 config/src/config.rs               |  26 +++-
 config/src/lib.rs                  |   2 +
 config/src/{lib.rs => protocol.rs} |  35 ++++-
 config/src/service.rs              |  74 ++++++++++
 dubbo/Cargo.toml                   |   1 +
 dubbo/src/echo/echo_client.rs      |  59 +++-----
 dubbo/src/echo/echo_server.rs      |  68 ++++++---
 dubbo/src/echo/helloworld.rs       | 296 +++++++++++++++++++++++++++++++++++++
 dubbo/src/echo/mod.rs              |  94 +++++++++---
 dubbo/src/lib.rs                   |   8 +-
 dubbo/src/main.rs                  |   8 +-
 triple/readme.md                   |  15 ++
 triple/src/client/grpc.rs          | 108 ++++++++++----
 triple/src/codec/mod.rs            |   1 +
 triple/src/codec/prost.rs          | 236 +++++++++++++++++++++++++++++
 triple/src/invocation.rs           |  23 +++
 triple/src/server/encode.rs        |   1 -
 triple/src/server/server.rs        |  68 +++------
 triple/src/transport/service.rs    |  11 +-
 19 files changed, 959 insertions(+), 175 deletions(-)

diff --git a/config/src/config.rs b/config/src/config.rs
index a53dcd2..1adf7d3 100644
--- a/config/src/config.rs
+++ b/config/src/config.rs
@@ -17,28 +17,48 @@
 
 use std::{any, collections::HashMap};
 
+use super::protocol::ProtocolConfig;
+use super::service::ServiceConfig;
+
 /// used to storage all structed config, from some source: cmd, file..;
 /// Impl Config trait, business init by read Config trait
 #[allow(dead_code)]
 #[derive(Debug, Default)]
 pub struct RootConfig {
-    name: String,
-    data: HashMap<String, Box<dyn any::Any>>,
+    pub name: String,
+    pub service: ServiceConfig,
+    pub data: HashMap<String, Box<dyn any::Any>>,
 }
 
 pub fn get_global_config() -> RootConfig {
-    RootConfig::new()
+    let mut c = RootConfig::new();
+    c.load();
+    c
 }
 
 impl RootConfig {
     pub fn new() -> Self {
         Self {
             name: "dubbo".to_string(),
+            service: ServiceConfig::default(),
             data: HashMap::new(),
         }
     }
 
     pub fn load(&mut self) {
+        let service_config = ServiceConfig::default()
+            .group("test".to_string())
+            .serializer("json".to_string())
+            .version("1.0.0".to_string())
+            .name("echo".to_string());
+
+        let triple_config = ProtocolConfig::default()
+            .name("triple".to_string())
+            .ip("0.0.0.0".to_string())
+            .port("8888".to_string());
+
+        let service_config = 
service_config.add_protocol_configs(triple_config);
+        self.service = service_config;
         // 通过环境变量读取某个文件。加在到内存中
         self.data.insert(
             "dubbo.provider.url".to_string(),
diff --git a/config/src/lib.rs b/config/src/lib.rs
index edbaf58..6237816 100644
--- a/config/src/lib.rs
+++ b/config/src/lib.rs
@@ -16,6 +16,8 @@
  */
 
 pub mod config;
+pub mod protocol;
+pub mod service;
 
 pub use config::*;
 
diff --git a/config/src/lib.rs b/config/src/protocol.rs
similarity index 54%
copy from config/src/lib.rs
copy to config/src/protocol.rs
index edbaf58..d139bed 100644
--- a/config/src/lib.rs
+++ b/config/src/protocol.rs
@@ -15,15 +15,34 @@
  * limitations under the License.
  */
 
-pub mod config;
+use std::collections::HashMap;
 
-pub use config::*;
+#[derive(Default, Debug, Clone)]
+pub struct ProtocolConfig {
+    pub ip: String,
+    pub port: String,
+    pub name: String,
+    pub params: HashMap<String, String>,
+}
+
+impl ProtocolConfig {
+    pub fn name(self, name: String) -> Self {
+        Self { name, ..self }
+    }
+
+    pub fn ip(self, ip: String) -> Self {
+        Self { ip, ..self }
+    }
+
+    pub fn port(self, port: String) -> Self {
+        Self { port, ..self }
+    }
+
+    pub fn params(self, params: HashMap<String, String>) -> Self {
+        Self { params, ..self }
+    }
 
-#[cfg(test)]
-mod tests {
-    #[test]
-    fn it_works() {
-        let result = 2 + 2;
-        assert_eq!(result, 4);
+    pub fn to_url(self) -> String {
+        format!("{}://{}:{}", self.name, self.ip, self.port)
     }
 }
diff --git a/config/src/service.rs b/config/src/service.rs
new file mode 100644
index 0000000..aeda24b
--- /dev/null
+++ b/config/src/service.rs
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::collections::HashMap;
+
+use super::protocol::ProtocolConfig;
+
+#[derive(Debug, Default)]
+pub struct ServiceConfig {
+    pub version: String,
+    pub group: String,
+    pub name: String,
+    pub protocol_names: Vec<String>,
+    pub registry_names: Vec<String>,
+    pub serializer: String,
+    pub protocol_configs: HashMap<String, ProtocolConfig>,
+}
+
+impl ServiceConfig {
+    pub fn name(self, name: String) -> Self {
+        Self { name, ..self }
+    }
+
+    pub fn version(self, version: String) -> Self {
+        Self { version, ..self }
+    }
+
+    pub fn group(self, group: String) -> Self {
+        Self { group, ..self }
+    }
+
+    pub fn protocol_names(self, protocol_names: Vec<String>) -> Self {
+        Self {
+            protocol_names,
+            ..self
+        }
+    }
+
+    pub fn serializer(self, serializer: String) -> Self {
+        Self { serializer, ..self }
+    }
+
+    pub fn add_protocol_configs(mut self, protocol_config: ProtocolConfig) -> 
Self {
+        self.protocol_configs
+            .insert(protocol_config.name.clone(), protocol_config);
+        Self { ..self }
+    }
+
+    // pub fn get_url(&self) -> Vec<Url> {
+    //     let mut urls = Vec::new();
+    //     for (_, conf) in self.protocol_configs.iter() {
+    //         urls.push(Url {
+    //             url: conf.to_owned().to_url(),
+    //             service_key: "".to_string(),
+    //         });
+    //     }
+
+    //     urls
+    // }
+}
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 1a693ba..ce32cb4 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -31,6 +31,7 @@ pin-project = "1.0"
 serde_json = "1.0.82"
 serde = {version="1.0.138", features = ["derive"]}
 tokio-stream = "0.1"
+futures = "0.3"
 
 config = {path = "../config"}
 triple = {path = "../triple"}
\ No newline at end of file
diff --git a/dubbo/src/echo/echo_client.rs b/dubbo/src/echo/echo_client.rs
index 0f8215b..ebe7908 100644
--- a/dubbo/src/echo/echo_client.rs
+++ b/dubbo/src/echo/echo_client.rs
@@ -1,7 +1,21 @@
-use std::str::FromStr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 use super::echo_server::{HelloReply, HelloRequest};
-use bytes::Buf;
 
 use triple::client::TripleClient;
 use triple::codec::serde_codec::SerdeCodec;
@@ -28,10 +42,8 @@ impl EchoClient {
     }
 
     pub fn with_uri(mut self, uri: String) -> Self {
-        self.uri = uri;
-        self.inner = self
-            .inner
-            
.with_authority(http::uri::Authority::from_str(&self.uri).unwrap());
+        self.uri = uri.clone();
+        self.inner = self.inner.with_host(uri);
         self
     }
 
@@ -53,36 +65,9 @@ impl EchoClient {
         &self,
         req: Request<HelloRequest>,
     ) -> Result<Response<HelloReply>, tonic::Status> {
-        let (_parts, body) = req.into_parts();
-        let v = serde_json::to_vec(&body).unwrap();
-        let req = hyper::Request::builder()
-            .uri("http://".to_owned() + &self.uri.clone() + "/hello")
-            .method("POST")
-            .body(hyper::Body::from(v))
-            .unwrap();
-
-        println!("request: {:?}", req);
-        let response = 
hyper::Client::builder().build_http().request(req).await;
-
-        match response {
-            Ok(v) => {
-                println!("{:?}", v);
-                let (_parts, body) = v.into_parts();
-                let req_body = hyper::body::to_bytes(body).await.unwrap();
-                let v = req_body.chunk();
-                // let codec = SerdeCodec::<HelloReply, 
HelloRequest>::default();
-                let data: HelloReply = match serde_json::from_slice(v) {
-                    Ok(data) => data,
-                    Err(err) => {
-                        return Err(tonic::Status::new(tonic::Code::Internal, 
err.to_string()))
-                    }
-                };
-                Ok(Response::new(data))
-            }
-            Err(err) => {
-                println!("{}", err);
-                Err(tonic::Status::new(tonic::Code::Internal, err.to_string()))
-            }
-        }
+        let codec = SerdeCodec::<HelloRequest, HelloReply>::default();
+        self.inner
+            .unary(req, codec, http::uri::PathAndQuery::from_static("/hello"))
+            .await
     }
 }
diff --git a/dubbo/src/echo/echo_server.rs b/dubbo/src/echo/echo_server.rs
index d6e9476..96d89f3 100644
--- a/dubbo/src/echo/echo_server.rs
+++ b/dubbo/src/echo/echo_server.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 use async_trait::async_trait;
 
 use tonic::codegen::BoxFuture;
@@ -50,7 +67,6 @@ pub trait Echo: Send + Sync + 'static {
 
 struct _Inner<T>(Arc<T>);
 
-#[derive(Clone)]
 pub struct EchoServer<T, I = TripleInvoker> {
     inner: _Inner<T>,
     invoker: Option<I>,
@@ -141,23 +157,18 @@ where
 
                 Box::pin(fut)
             }
-            _ => {
-                Box::pin(async move {
-                    Ok(http::Response::builder()
-                        .status(200)
-                        .header("grpc-status", "12")
-                        .header("content-type", "application/grpc")
-                        // 
.body(hyper::Body::from("implement...").map_err(|err| match err {}).boxed())
-                        // 
.body(hyper::Body::from("implement...").map_err(|err| 
std::convert::Infallible).into())
-                        // .body(req.into_body())
-                        .body(
-                            http_body::Empty::new()
-                                .map_err(|err| match err {})
-                                .boxed_unsync(),
-                        )
-                        .unwrap())
-                })
-            }
+            _ => Box::pin(async move {
+                Ok(http::Response::builder()
+                    .status(200)
+                    .header("grpc-status", "12")
+                    .header("content-type", "application/grpc")
+                    .body(
+                        http_body::Empty::new()
+                            .map_err(|err| match err {})
+                            .boxed_unsync(),
+                    )
+                    .unwrap())
+            }),
         }
     }
 }
@@ -186,3 +197,24 @@ impl<T: Debug> Debug for _Inner<T> {
         write!(f, "Inner {:?}", self.0)
     }
 }
+
+impl<T: Echo, I: Invoker + Send + Sync + 'static> Clone for EchoServer<T, I> {
+    fn clone(&self) -> Self {
+        let inner = self.inner.clone();
+        Self {
+            inner,
+            invoker: None,
+        }
+    }
+}
+
+pub fn register_echo_server<T: Echo>(server: T) {
+    let s = EchoServer::<_, TripleInvoker>::new(server);
+    crate::protocol::triple::TRIPLE_SERVICES
+        .write()
+        .unwrap()
+        .insert(
+            "echo".to_string(),
+            crate::utils::boxed_clone::BoxCloneService::new(s),
+        );
+}
diff --git a/dubbo/src/echo/helloworld.rs b/dubbo/src/echo/helloworld.rs
new file mode 100644
index 0000000..9497f4b
--- /dev/null
+++ b/dubbo/src/echo/helloworld.rs
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+
+
+
+/// The request message containing the user's name.
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct HelloRequest {
+    #[prost(string, tag = "1")]
+    pub name: ::prost::alloc::string::String,
+}
+/// The response message containing the greetings
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct HelloReply {
+    #[prost(string, tag = "1")]
+    pub message: ::prost::alloc::string::String,
+}
+/// Generated client implementations.
+pub mod greeter_client {
+    #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
+    use triple::client::TripleClient;
+    use triple::codec::prost::ProstCodec;
+    use triple::invocation::*;
+    
+    /// The greeting service definition.
+    #[derive(Debug, Clone)]
+    pub struct GreeterClient {
+        inner: TripleClient,
+        uri: String,
+    }
+    // impl GreeterClient<tonic::transport::Channel> {
+    //     /// Attempt to create a new client by connecting to a given 
endpoint.
+    //     pub async fn connect<D>(dst: D) -> Result<Self, 
tonic::transport::Error>
+    //     where
+    //         D: std::convert::TryInto<tonic::transport::Endpoint>,
+    //         D::Error: Into<StdError>,
+    //     {
+    //         let conn = 
tonic::transport::Endpoint::new(dst)?.connect().await?;
+    //         Ok(Self::new(conn))
+    //     }
+    // }
+    impl GreeterClient {
+        pub fn new() -> Self {
+            Self {
+                inner: TripleClient::new(),
+                uri: "".to_string(),
+            }
+        }
+        // pub fn with_interceptor<F>(
+        //     inner: T,
+        //     interceptor: F,
+        // ) -> GreeterClient<InterceptedService<T, F>>
+        // where
+        //     F: tonic::service::Interceptor,
+        //     T::ResponseBody: Default,
+        //     T: tonic::codegen::Service<
+        //         http::Request<tonic::body::BoxBody>,
+        //         Response = http::Response<
+        //             <T as 
tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
+        //         >,
+        //     >,
+        //     <T as 
tonic::codegen::Service<http::Request<tonic::body::BoxBody>>>::Error:
+        //         Into<StdError> + Send + Sync,
+        // {
+        //     GreeterClient::new(InterceptedService::new(inner, interceptor))
+        // }
+        /// Compress requests with `gzip`.
+        ///
+        /// This requires the server to support it otherwise it might respond 
with an
+        /// error.
+        // #[must_use]
+        // pub fn send_gzip(mut self) -> Self {
+        //     self.inner = self.inner.send_gzip();
+        //     self
+        // }
+        // /// Enable decompressing responses with `gzip`.
+        // #[must_use]
+        // pub fn accept_gzip(mut self) -> Self {
+        //     self.inner = self.inner.accept_gzip();
+        //     self
+        // }
+        /// Sends a greeting
+        pub async fn say_hello(
+            &mut self,
+            request: Request<super::HelloRequest>,
+        ) -> Result<Response<super::HelloReply>, tonic::Status> {
+            // self.inner.ready().await.map_err(|e| {
+            //     tonic::Status::new(
+            //         tonic::Code::Unknown,
+            //         format!("Service was not ready: {}", e.into()),
+            //     )
+            // })?;
+            let codec = ProstCodec::<super::HelloRequest, 
super::HelloReply>::default();
+            let path = 
http::uri::PathAndQuery::from_static("/helloworld.Greeter/SayHello");
+            self.inner.unary(request, codec, path).await
+        }
+    }
+}
+/// Generated server implementations.
+pub mod greeter_server {
+    #![allow(unused_variables, dead_code, missing_docs, 
clippy::let_unit_value)]
+
+    use crate::protocol::server_desc::ServiceDesc;
+    use crate::protocol::triple::triple_invoker::TripleInvoker;
+    use crate::protocol::DubboGrpcService;
+    use crate::protocol::Invoker;
+    use crate::{BoxFuture, StdError};
+    use async_trait::async_trait;
+    use http_body::Body;
+    use std::sync::Arc;
+    use std::task::Context;
+    use std::task::Poll;
+    use tower_service::Service;
+    use triple::codec::prost::ProstCodec;
+    use triple::empty_body;
+    use triple::invocation::{Request, Response};
+    use triple::server::server::TripleServer;
+    use triple::server::service::UnaryService;
+
+    ///Generated trait containing gRPC methods that should be implemented for 
use with GreeterServer.
+    #[async_trait]
+    pub trait Greeter: Send + Sync + 'static {
+        /// Sends a greeting
+        async fn say_hello(
+            &self,
+            request: Request<super::HelloRequest>,
+        ) -> Result<Response<super::HelloReply>, tonic::Status>;
+    }
+    /// The greeting service definition.
+    #[derive(Debug)]
+    pub struct GreeterServer<T: Greeter, I> {
+        inner: _Inner<T>,
+        invoker: Option<_Inner<I>>,
+        // accept_compression_encodings: EnabledCompressionEncodings,
+        // send_compression_encodings: EnabledCompressionEncodings,
+    }
+    struct _Inner<T>(Arc<T>);
+    impl<T: Greeter, I> GreeterServer<T, I> {
+        pub fn new(inner: T) -> Self {
+            Self::from_arc(Arc::new(inner))
+        }
+        pub fn from_arc(inner: Arc<T>) -> Self {
+            let inner = _Inner(inner);
+            Self {
+                inner,
+                invoker: None,
+                // accept_compression_encodings: Default::default(),
+                // send_compression_encodings: Default::default(),
+            }
+        }
+        // pub fn with_interceptor<F>(inner: T, interceptor: F) -> 
InterceptedService<Self, F>
+        // where
+        //     F: tonic::service::Interceptor,
+        // {
+        //     InterceptedService::new(Self::new(inner), interceptor)
+        // }
+        // /// Enable decompressing requests with `gzip`.
+        // #[must_use]
+        // pub fn accept_gzip(mut self) -> Self {
+        //     self.accept_compression_encodings.enable_gzip();
+        //     self
+        // }
+        // /// Compress responses with `gzip`, if the client supports it.
+        // #[must_use]
+        // pub fn send_gzip(mut self) -> Self {
+        //     self.send_compression_encodings.enable_gzip();
+        //     self
+        // }
+    }
+    impl<T, B, I> Service<http::Request<B>> for GreeterServer<T, I>
+    where
+        T: Greeter,
+        B: Body + Send + 'static,
+        B::Error: Into<StdError> + Send + 'static,
+    {
+        type Response = http::Response<tonic::body::BoxBody>;
+        type Error = std::convert::Infallible;
+        type Future = BoxFuture<Self::Response, Self::Error>;
+        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), 
Self::Error>> {
+            Poll::Ready(Ok(()))
+        }
+        fn call(&mut self, req: http::Request<B>) -> Self::Future {
+            let inner = self.inner.clone();
+            match req.uri().path() {
+                "/helloworld.Greeter/SayHello" => {
+                    #[allow(non_camel_case_types)]
+                    struct SayHelloSvc<T: Greeter>(pub Arc<T>);
+                    impl<T: Greeter> UnaryService<super::HelloRequest> for 
SayHelloSvc<T> {
+                        type Response = super::HelloReply;
+                        type Future = BoxFuture<Response<Self::Response>, 
tonic::Status>;
+                        fn call(&mut self, request: 
Request<super::HelloRequest>) -> Self::Future {
+                            let inner = self.0.clone();
+                            let fut = async move { 
(*inner).say_hello(request).await };
+                            Box::pin(fut)
+                        }
+                    }
+                    // let accept_compression_encodings = 
self.accept_compression_encodings;
+                    // let send_compression_encodings = 
self.send_compression_encodings;
+                    let inner = self.inner.clone();
+                    let fut = async move {
+                        let inner = inner.0;
+                        let method = SayHelloSvc(inner);
+                        let codec = ProstCodec::<super::HelloReply, 
super::HelloRequest>::default();
+                        // let mut grpc = 
tonic::server::Grpc::new(codec).apply_compression_config(
+                        //     accept_compression_encodings,
+                        //     send_compression_encodings,
+                        // );
+                        let mut grpc = TripleServer::new(codec);
+                        let res = grpc.unary(method, req).await;
+                        Ok(res)
+                    };
+                    Box::pin(fut)
+                }
+                _ => Box::pin(async move {
+                    Ok(http::Response::builder()
+                        .status(200)
+                        .header("grpc-status", "12")
+                        .header("content-type", "application/grpc")
+                        .body(empty_body())
+                        .unwrap())
+                }),
+            }
+        }
+    }
+    impl<T: Greeter, I: Invoker + Send + Sync + 'static> Clone for 
GreeterServer<T, I> {
+        fn clone(&self) -> Self {
+            let inner = self.inner.clone();
+            Self {
+                inner,
+                invoker: None,
+                // invoker: if let Some(v) = self.invoker.borrow_mut() {
+                //     Some(v.clone())
+                // } else {
+                //     None
+                // },
+                // accept_compression_encodings: 
self.accept_compression_encodings,
+                // send_compression_encodings: self.send_compression_encodings,
+            }
+        }
+    }
+
+    impl<T: Greeter> Clone for _Inner<T> {
+        fn clone(&self) -> Self {
+            Self(self.0.clone())
+        }
+    }
+    impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "{:?}", self.0)
+        }
+    }
+    impl<T: Greeter, I: Invoker> DubboGrpcService<I> for GreeterServer<T, I> {
+        fn set_proxy_impl(&mut self, invoker: I) {
+            self.invoker = Some(_Inner(Arc::new(invoker)));
+        }
+
+        fn service_desc(&self) -> ServiceDesc {
+            ServiceDesc::new(
+                "helloworld.Greeter".to_string(),
+                std::collections::HashMap::new(),
+            )
+        }
+    }
+
+    impl<T: Greeter, I> tonic::transport::NamedService for GreeterServer<T, I> 
{
+        const NAME: &'static str = "helloworld.Greeter";
+    }
+
+    pub fn register_greeter_server<T: Greeter>(server: T) {
+        let s = GreeterServer::<_, TripleInvoker>::new(server);
+        crate::protocol::triple::TRIPLE_SERVICES
+            .write()
+            .unwrap()
+            .insert(
+                "helloworld.Greeter".to_string(),
+                crate::utils::boxed_clone::BoxCloneService::new(s),
+            );
+    }
+}
diff --git a/dubbo/src/echo/mod.rs b/dubbo/src/echo/mod.rs
index 356fa9a..4df0678 100644
--- a/dubbo/src/echo/mod.rs
+++ b/dubbo/src/echo/mod.rs
@@ -1,5 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 pub mod echo_client;
 pub mod echo_server;
+pub mod helloworld;
 
 use futures_util::Stream;
 use futures_util::StreamExt;
@@ -21,7 +39,7 @@ async fn test_client() {
     use futures_util::StreamExt;
     use triple::invocation::*;
 
-    let cli = EchoClient::new().with_uri("127.0.0.1:8888".to_string());
+    let cli = EchoClient::new().with_uri("http://127.0.0.1:8888".to_string());
     let resp = cli
         .say_hello(Request::new(HelloRequest {
             name: "message from client".to_string(),
@@ -49,9 +67,8 @@ async fn test_client() {
 
     let bidi_resp = cli.bidirectional_streaming_echo(req).await.unwrap();
 
-    let (_parts, mut body) = bidi_resp.into_parts();
-    // let trailer = body.trailer().await.unwrap();
-    // println!("trailer: {:?}", trailer);
+    let (parts, mut body) = bidi_resp.into_parts();
+    println!("parts: {:?}", parts);
     while let Some(item) = body.next().await {
         match item {
             Ok(v) => {
@@ -97,26 +114,39 @@ async fn test_triple_protocol() {
     use crate::common::url::Url;
     use crate::protocol::triple::triple_protocol::TripleProtocol;
     use crate::protocol::Protocol;
-    use crate::utils::boxed_clone::BoxCloneService;
+    use config::get_global_config;
+    use futures::join;
 
-    // crate::init::init();
+    let conf = get_global_config();
+    let server_name = "echo".to_string();
 
-    let esi = EchoServer::<EchoServerImpl>::new(EchoServerImpl {
-        name: "echo".to_string(),
+    echo_server::register_echo_server(EchoServerImpl {
+        name: server_name.clone(),
     });
+    helloworld::greeter_server::register_greeter_server(GreeterImpl {});
+    println!("root config: {:?}", conf);
+    println!(
+        "register service num: {:?}",
+        crate::protocol::triple::TRIPLE_SERVICES
+            .read()
+            .unwrap()
+            .len()
+    );
+
+    let mut urls = Vec::<Url>::new();
+    for (_, proto_conf) in conf.service.protocol_configs.iter() {
+        println!("{:?}", proto_conf);
+        let u = Url {
+            url: proto_conf.to_owned().to_url().clone(),
+            service_key: server_name.clone(),
+        };
+        urls.push(u.clone());
 
-    crate::protocol::triple::TRIPLE_SERVICES
-        .write()
-        .unwrap()
-        .insert("echo".to_string(), BoxCloneService::new(esi));
-
-    println!("triple server running, url: 0.0.0.0:8888");
-    let pro = TripleProtocol::new();
-    pro.export(Url {
-        url: "0.0.0.0:8888".to_string(),
-        service_key: "echo".to_string(),
-    })
-    .await;
+        println!("triple server running, url: 0.0.0.0:8888, {:?}", u);
+        let pro = TripleProtocol::new();
+        let tri_fut = pro.export(u.clone());
+        let _res = join!(tri_fut);
+    }
 }
 
 #[allow(dead_code)]
@@ -132,7 +162,7 @@ impl Echo for EchoServerImpl {
         &self,
         req: Request<HelloRequest>,
     ) -> Result<Response<HelloReply>, tonic::Status> {
-        println!("EchoServer::hello {:?}", req.message);
+        println!("EchoServer::hello {:?}", req.metadata);
 
         Ok(Response::new(HelloReply {
             reply: "hello, dubbo-rust".to_string(),
@@ -145,7 +175,10 @@ impl Echo for EchoServerImpl {
         &self,
         request: Request<triple::server::Streaming<HelloRequest>>,
     ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, 
tonic::Status> {
-        println!("EchoServer::bidirectional_streaming_echo");
+        println!(
+            "EchoServer::bidirectional_streaming_echo, grpc header: {:?}",
+            request.metadata
+        );
 
         let mut in_stream = request.into_inner();
         let (tx, rx) = mpsc::channel(128);
@@ -212,3 +245,20 @@ fn match_for_io_error(err_status: &tonic::Status) -> 
Option<&std::io::Error> {
         };
     }
 }
+
+use helloworld::greeter_server::Greeter;
+
+struct GreeterImpl {}
+
+#[async_trait]
+impl Greeter for GreeterImpl {
+    async fn say_hello(
+        &self,
+        request: Request<helloworld::HelloRequest>,
+    ) -> Result<Response<helloworld::HelloReply>, tonic::Status> {
+        println!("greeter: req: {:?}", request.metadata);
+        Ok(Response::new(helloworld::HelloReply {
+            message: "hello, rust".to_string(),
+        }))
+    }
+}
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index 90ceb1c..1740648 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -18,9 +18,11 @@
 pub mod common;
 pub mod echo;
 pub mod helloworld;
-pub mod init;
 pub mod protocol;
-pub mod registry;
 pub mod utils;
 
-pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
+use std::future::Future;
+use std::pin::Pin;
+
+pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
+pub type BoxFuture<T, E> = self::Pin<Box<dyn self::Future<Output = Result<T, 
E>> + Send + 'static>>;
diff --git a/dubbo/src/main.rs b/dubbo/src/main.rs
index db99336..baa5af4 100644
--- a/dubbo/src/main.rs
+++ b/dubbo/src/main.rs
@@ -18,12 +18,14 @@
 pub mod common;
 pub mod echo;
 pub mod helloworld;
-pub mod init;
 pub mod protocol;
-pub mod registry;
 pub mod utils;
 
-pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
+use std::future::Future;
+use std::pin::Pin;
+
+pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
+pub type BoxFuture<T, E> = self::Pin<Box<dyn self::Future<Output = Result<T, 
E>> + Send + 'static>>;
 
 #[tokio::main]
 async fn main() {
diff --git a/triple/readme.md b/triple/readme.md
new file mode 100644
index 0000000..c969a78
--- /dev/null
+++ b/triple/readme.md
@@ -0,0 +1,15 @@
+# Triple Protocol
+
+Triple协议使用了hyper作为C/S之间的通信层,支持triple spec中自定义的header。
+
+整体模块划分为:
++ codec
++ server:
+  + Router
+  + 网络事件处理层
+  + 线程池:参考triple-go的线程模型
++ client:
+  + 线程池
+  + 连接处理层,类似于grpc channel层
+
+Triple支持tls、grpc压缩
\ No newline at end of file
diff --git a/triple/src/client/grpc.rs b/triple/src/client/grpc.rs
index 1277868..362a0f2 100644
--- a/triple/src/client/grpc.rs
+++ b/triple/src/client/grpc.rs
@@ -15,17 +15,19 @@
  * limitations under the License.
  */
 
-use futures_util::{StreamExt, TryStreamExt};
+use std::str::FromStr;
+
+use futures_util::{future, stream, StreamExt, TryStreamExt};
 use http::HeaderValue;
 
 use crate::codec::Codec;
-use crate::invocation::{IntoStreamingRequest, Response};
+use crate::invocation::{IntoStreamingRequest, Request, Response};
 use crate::server::encode::encode;
 use crate::server::Streaming;
 
 #[derive(Debug, Clone, Default)]
 pub struct TripleClient {
-    host: Option<http::uri::Authority>,
+    host: Option<http::Uri>,
     inner: ConnectionPool,
 }
 
@@ -55,52 +57,42 @@ impl TripleClient {
         }
     }
 
-    pub fn with_authority(self, host: http::uri::Authority) -> Self {
+    /// host: http://0.0.0.0:8888
+    pub fn with_host(self, host: String) -> Self {
+        let uri = http::Uri::from_str(&host).unwrap();
         TripleClient {
-            host: Some(host),
+            host: Some(uri),
             ..self
         }
     }
 }
 
 impl TripleClient {
-    pub async fn bidi_streaming<C, M1, M2>(
-        &mut self,
-        req: impl IntoStreamingRequest<Message = M1>,
-        mut codec: C,
+    fn map_request(
+        &self,
         path: http::uri::PathAndQuery,
-    ) -> Result<Response<Streaming<M2>>, tonic::Status>
-    where
-        C: Codec<Encode = M1, Decode = M2>,
-        M1: Send + Sync + 'static,
-        M2: Send + Sync + 'static,
-    {
-        let req = req.into_streaming_request();
-        let en = encode(codec.encoder(), 
req.into_inner().map(Ok)).into_stream();
-        let body = hyper::Body::wrap_stream(en);
-
-        let mut parts = http::uri::Parts::default();
+        body: hyper::Body,
+    ) -> http::Request<hyper::Body> {
+        let mut parts = self.host.clone().unwrap().into_parts();
         parts.path_and_query = Some(path);
-        parts.authority = self.host.clone();
-        parts.scheme = Some(http::uri::Scheme::HTTP);
 
         let uri = http::Uri::from_parts(parts).unwrap();
-
         let mut req = hyper::Request::builder()
             .version(http::Version::HTTP_2)
             .uri(uri.clone())
             .method("POST")
             .body(body)
             .unwrap();
+
         *req.version_mut() = http::Version::HTTP_2;
         req.headers_mut()
             .insert("method", HeaderValue::from_static("POST"));
         req.headers_mut().insert(
             "scheme",
-            HeaderValue::from_str(uri.clone().scheme_str().unwrap()).unwrap(),
+            HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(),
         );
         req.headers_mut()
-            .insert("path", 
HeaderValue::from_str(uri.clone().path()).unwrap());
+            .insert("path", HeaderValue::from_str(uri.path()).unwrap());
         req.headers_mut().insert(
             "authority",
             HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
@@ -136,20 +128,78 @@ impl TripleClient {
         //     TripleTraceProtoBin  = "tri-trace-proto-bin"
         //     TripleUnitInfo       = "tri-unit-info"
         // )
+        req
+    }
+
+    pub async fn unary<C, M1, M2>(
+        &self,
+        req: Request<M1>,
+        mut codec: C,
+        path: http::uri::PathAndQuery,
+    ) -> Result<Response<M2>, tonic::Status>
+    where
+        C: Codec<Encode = M1, Decode = M2>,
+        M1: Send + Sync + 'static,
+        M2: Send + Sync + 'static,
+    {
+        let req = req.map(|m| stream::once(future::ready(m)));
+        let body_stream = encode(codec.encoder(), 
req.into_inner().map(Ok)).into_stream();
+        let body = hyper::Body::wrap_stream(body_stream);
 
+        let req = self.map_request(path, body);
         let cli = self.inner.clone().builder();
         let response = cli.request(req).await;
 
         match response {
             Ok(v) => {
                 let resp = v.map(|body| Streaming::new(body, codec.decoder()));
+                let (mut parts, body) = Response::from_http(resp).into_parts();
+
+                futures_util::pin_mut!(body);
+
+                let message = body.try_next().await?.ok_or_else(|| {
+                    tonic::Status::new(tonic::Code::Internal, "Missing 
response message.")
+                })?;
 
-                let (_parts, body) = resp.into_parts();
-                Ok(Response::new(body))
+                if let Some(trailers) = body.trailer().await? {
+                    let mut h = parts.into_headers();
+                    h.extend(trailers.into_headers());
+                    parts = tonic::metadata::MetadataMap::from_headers(h);
+                }
+
+                Ok(Response::from_parts(parts, message))
             }
-            Err(err) => {
-                Err(tonic::Status::new(tonic::Code::Internal, err.to_string()))
+            Err(err) => Err(tonic::Status::new(tonic::Code::Internal, 
err.to_string())),
+        }
+    }
+
+    pub async fn bidi_streaming<C, M1, M2>(
+        &mut self,
+        req: impl IntoStreamingRequest<Message = M1>,
+        mut codec: C,
+        path: http::uri::PathAndQuery,
+    ) -> Result<Response<Streaming<M2>>, tonic::Status>
+    where
+        C: Codec<Encode = M1, Decode = M2>,
+        M1: Send + Sync + 'static,
+        M2: Send + Sync + 'static,
+    {
+        let req = req.into_streaming_request();
+        let en = encode(codec.encoder(), 
req.into_inner().map(Ok)).into_stream();
+        let body = hyper::Body::wrap_stream(en);
+
+        let req = self.map_request(path, body);
+
+        let cli = self.inner.clone().builder();
+        let response = cli.request(req).await;
+
+        match response {
+            Ok(v) => {
+                let resp = v.map(|body| Streaming::new(body, codec.decoder()));
+
+                Ok(Response::from_http(resp))
             }
+            Err(err) => Err(tonic::Status::new(tonic::Code::Internal, 
err.to_string())),
         }
     }
 }
diff --git a/triple/src/codec/mod.rs b/triple/src/codec/mod.rs
index f6e61d9..c21b69a 100644
--- a/triple/src/codec/mod.rs
+++ b/triple/src/codec/mod.rs
@@ -16,6 +16,7 @@
  */
 
 pub mod buffer;
+pub mod prost;
 pub mod serde_codec;
 
 use std::io;
diff --git a/triple/src/codec/prost.rs b/triple/src/codec/prost.rs
new file mode 100644
index 0000000..162ac51
--- /dev/null
+++ b/triple/src/codec/prost.rs
@@ -0,0 +1,236 @@
+use super::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder};
+use prost::Message;
+use std::marker::PhantomData;
+use tonic::{Code, Status};
+
+/// A [`Codec`] that implements `application/grpc+proto` via the prost 
library..
+#[derive(Debug, Clone)]
+pub struct ProstCodec<T, U> {
+    _pd: PhantomData<(T, U)>,
+}
+
+impl<T, U> Default for ProstCodec<T, U> {
+    fn default() -> Self {
+        Self { _pd: PhantomData }
+    }
+}
+
+impl<T, U> Codec for ProstCodec<T, U>
+where
+    T: Message + Send + 'static,
+    U: Message + Default + Send + 'static,
+{
+    type Encode = T;
+    type Decode = U;
+
+    type Encoder = ProstEncoder<T>;
+    type Decoder = ProstDecoder<U>;
+
+    fn encoder(&mut self) -> Self::Encoder {
+        ProstEncoder(PhantomData)
+    }
+
+    fn decoder(&mut self) -> Self::Decoder {
+        ProstDecoder(PhantomData)
+    }
+}
+
+/// A [`Encoder`] that knows how to encode `T`.
+#[derive(Debug, Clone, Default)]
+pub struct ProstEncoder<T>(PhantomData<T>);
+
+impl<T: Message> Encoder for ProstEncoder<T> {
+    type Item = T;
+    type Error = Status;
+
+    fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> 
Result<(), Self::Error> {
+        item.encode(buf)
+            .expect("Message only errors if not enough space");
+
+        Ok(())
+    }
+}
+
+/// A [`Decoder`] that knows how to decode `U`.
+#[derive(Debug, Clone, Default)]
+pub struct ProstDecoder<U>(PhantomData<U>);
+
+impl<U: Message + Default> Decoder for ProstDecoder<U> {
+    type Item = U;
+    type Error = Status;
+
+    fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> 
Result<Option<Self::Item>, Self::Error> {
+        let item = Message::decode(buf)
+            .map(Option::Some)
+            .map_err(from_decode_error)?;
+
+        Ok(item)
+    }
+}
+
+fn from_decode_error(error: prost::DecodeError) -> Status {
+    // Map Protobuf parse errors to an INTERNAL status code, as per
+    // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
+    Status::new(Code::Internal, error.to_string())
+}
+
+// #[cfg(test)]
+// mod tests {
+//     use crate::codec::compression::SingleMessageCompressionOverride;
+//     use crate::codec::{
+//         encode_server, DecodeBuf, Decoder, EncodeBuf, Encoder, Streaming, 
HEADER_SIZE,
+//     };
+//     use crate::Status;
+//     use bytes::{Buf, BufMut, BytesMut};
+//     use http_body::Body;
+
+//     const LEN: usize = 10000;
+
+//     #[tokio::test]
+//     async fn decode() {
+//         let decoder = MockDecoder::default();
+
+//         let msg = vec![0u8; LEN];
+
+//         let mut buf = BytesMut::new();
+
+//         buf.reserve(msg.len() + HEADER_SIZE);
+//         buf.put_u8(0);
+//         buf.put_u32(msg.len() as u32);
+
+//         buf.put(&msg[..]);
+
+//         let body = body::MockBody::new(&buf[..], 10005, 0);
+
+//         let mut stream = Streaming::new_request(decoder, body, None);
+
+//         let mut i = 0usize;
+//         while let Some(output_msg) = stream.message().await.unwrap() {
+//             assert_eq!(output_msg.len(), msg.len());
+//             i += 1;
+//         }
+//         assert_eq!(i, 1);
+//     }
+
+//     #[tokio::test]
+//     async fn encode() {
+//         let encoder = MockEncoder::default();
+
+//         let msg = Vec::from(&[0u8; 1024][..]);
+
+//         let messages = std::iter::repeat_with(move || Ok::<_, 
Status>(msg.clone())).take(10000);
+//         let source = futures_util::stream::iter(messages);
+
+//         let body = encode_server(
+//             encoder,
+//             source,
+//             None,
+//             SingleMessageCompressionOverride::default(),
+//         );
+
+//         futures_util::pin_mut!(body);
+
+//         while let Some(r) = body.data().await {
+//             r.unwrap();
+//         }
+//     }
+
+//     #[derive(Debug, Clone, Default)]
+//     struct MockEncoder;
+
+//     impl Encoder for MockEncoder {
+//         type Item = Vec<u8>;
+//         type Error = Status;
+
+//         fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> 
Result<(), Self::Error> {
+//             buf.put(&item[..]);
+//             Ok(())
+//         }
+//     }
+
+//     #[derive(Debug, Clone, Default)]
+//     struct MockDecoder;
+
+//     impl Decoder for MockDecoder {
+//         type Item = Vec<u8>;
+//         type Error = Status;
+
+//         fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> 
Result<Option<Self::Item>, Self::Error> {
+//             let out = Vec::from(buf.chunk());
+//             buf.advance(LEN);
+//             Ok(Some(out))
+//         }
+//     }
+
+//     mod body {
+//         use crate::Status;
+//         use bytes::Bytes;
+//         use http_body::Body;
+//         use std::{
+//             pin::Pin,
+//             task::{Context, Poll},
+//         };
+
+//         #[derive(Debug)]
+//         pub(super) struct MockBody {
+//             data: Bytes,
+
+//             // the size of the partial message to send
+//             partial_len: usize,
+
+//             // the number of times we've sent
+//             count: usize,
+//         }
+
+//         impl MockBody {
+//             pub(super) fn new(b: &[u8], partial_len: usize, count: usize) 
-> Self {
+//                 MockBody {
+//                     data: Bytes::copy_from_slice(b),
+//                     partial_len,
+//                     count,
+//                 }
+//             }
+//         }
+
+//         impl Body for MockBody {
+//             type Data = Bytes;
+//             type Error = Status;
+
+//             fn poll_data(
+//                 mut self: Pin<&mut Self>,
+//                 cx: &mut Context<'_>,
+//             ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+//                 // every other call to poll_data returns data
+//                 let should_send = self.count % 2 == 0;
+//                 let data_len = self.data.len();
+//                 let partial_len = self.partial_len;
+//                 let count = self.count;
+//                 if data_len > 0 {
+//                     let result = if should_send {
+//                         let response =
+//                             self.data
+//                                 .split_to(if count == 0 { partial_len } 
else { data_len });
+//                         Poll::Ready(Some(Ok(response)))
+//                     } else {
+//                         cx.waker().wake_by_ref();
+//                         Poll::Pending
+//                     };
+//                     // make some fake progress
+//                     self.count += 1;
+//                     result
+//                 } else {
+//                     Poll::Ready(None)
+//                 }
+//             }
+
+//             #[allow(clippy::drop_ref)]
+//             fn poll_trailers(
+//                 self: Pin<&mut Self>,
+//                 cx: &mut Context<'_>,
+//             ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+//                 drop(cx);
+//                 Poll::Ready(Ok(None))
+//             }
+//         }
+//     }
+// }
diff --git a/triple/src/invocation.rs b/triple/src/invocation.rs
index 1655312..de2e36e 100644
--- a/triple/src/invocation.rs
+++ b/triple/src/invocation.rs
@@ -39,6 +39,10 @@ impl<T> Request<T> {
         (self.metadata, self.message)
     }
 
+    pub fn from_parts(metadata: MetadataMap, message: T) -> Self {
+        Request { message, metadata }
+    }
+
     pub fn from_http(req: http::Request<T>) -> Self {
         let (parts, body) = req.into_parts();
         Request {
@@ -54,6 +58,17 @@ impl<T> Request<T> {
 
         http_req
     }
+
+    pub fn map<F, U>(self, f: F) -> Request<U>
+    where
+        F: FnOnce(T) -> U,
+    {
+        let m = f(self.message);
+        Request {
+            message: m,
+            metadata: self.metadata,
+        }
+    }
 }
 
 pub struct Response<T> {
@@ -85,6 +100,14 @@ impl<T> Response<T> {
         http_resp
     }
 
+    pub fn from_http(resp: http::Response<T>) -> Self {
+        let (part, body) = resp.into_parts();
+        Response {
+            message: body,
+            metadata: MetadataMap::from_headers(part.headers),
+        }
+    }
+
     pub fn map<F, U>(self, f: F) -> Response<U>
     where
         F: FnOnce(T) -> U,
diff --git a/triple/src/server/encode.rs b/triple/src/server/encode.rs
index c76f448..9e4ebc1 100644
--- a/triple/src/server/encode.rs
+++ b/triple/src/server/encode.rs
@@ -162,7 +162,6 @@ where
             tonic::Status::ok("")
         };
         let http = status.to_http();
-        println!("status: {:?}", http.headers().clone());
 
         Poll::Ready(Ok(Some(http.headers().to_owned())))
     }
diff --git a/triple/src/server/server.rs b/triple/src/server/server.rs
index 6474d6c..859f545 100644
--- a/triple/src/server/server.rs
+++ b/triple/src/server/server.rs
@@ -15,11 +15,10 @@
  * limitations under the License.
  */
 
-use bytes::{Buf, BytesMut};
+use futures_util::{future, stream, StreamExt, TryStreamExt};
 use http_body::Body;
-use std::fmt::Debug;
 
-use crate::codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder};
+use crate::codec::Codec;
 use crate::invocation::Request;
 use crate::server::encode::encode_server;
 use crate::server::service::{StreamingSvc, UnaryService};
@@ -75,55 +74,30 @@ where
     where
         S: UnaryService<T::Decode, Response = T::Encode>,
         B: Body + Send + 'static,
-        B::Error: Debug,
+        B::Error: Into<crate::Error> + Send,
     {
-        let (_parts, body) = req.into_parts();
-        let req_body = hyper::body::to_bytes(body).await.unwrap();
-        let v = req_body.chunk();
-        let mut req_byte = BytesMut::from(v);
-        let mut de = DecodeBuf::new(&mut req_byte, v.len());
-        let decoder = self
-            .codec
-            .decoder()
-            .decode(&mut de)
-            .map(|v| v.unwrap())
-            .unwrap();
-        let req = Request::new(decoder);
-
-        let resp = service.call(req).await;
-
-        let resp = match resp {
-            Ok(r) => r,
-            Err(status) => {
-                let (mut parts, _body) = http::Response::new(()).into_parts();
-                parts.headers.insert(
-                    http::header::CONTENT_TYPE,
-                    http::HeaderValue::from_static("application/grpc"),
-                );
-                parts.status = status.to_http().status();
-
-                return http::Response::from_parts(parts, crate::empty_body());
-            }
-        };
-        let (mut parts, body) = resp.into_http().into_parts();
+        let req_stream = req.map(|body| Streaming::new(body, 
self.codec.decoder()));
+        let (parts, mut body) = Request::from_http(req_stream).into_parts();
+        let msg = body
+            .try_next()
+            .await
+            .unwrap()
+            .ok_or_else(|| tonic::Status::new(tonic::Code::Unknown, "request 
wrong"));
 
-        // let data = hyper::body::aggregate(body)
-        // let b = body.size_hint();
-        let mut bytes = BytesMut::with_capacity(100);
-        let mut dst = EncodeBuf::new(&mut bytes);
-        let _res = self.codec.encoder().encode(body, &mut dst);
-        let data = bytes.to_vec();
+        let resp = service.call(Request::from_parts(parts, 
msg.unwrap())).await;
 
-        let resp_body = hyper::Body::from(data);
+        let (mut parts, resp_body) = resp.unwrap().into_http().into_parts();
+        let resp_body = encode_server(
+            self.codec.encoder(),
+            stream::once(future::ready(resp_body)).map(Ok).into_stream(),
+        );
 
+        parts.headers.insert(
+            http::header::CONTENT_TYPE,
+            http::HeaderValue::from_static("application/grpc"),
+        );
         parts.status = http::StatusCode::OK;
-        // http::Response::from_parts(parts, resp_body.map_err(|err| 
err.into()).boxed_unsync())
-        http::Response::from_parts(
-            parts,
-            resp_body
-                .map_err(|err| tonic::Status::new(tonic::Code::Internal, 
err.to_string()))
-                .boxed_unsync(),
-        )
+        http::Response::from_parts(parts, BoxBody::new(resp_body))
     }
 }
 
diff --git a/triple/src/transport/service.rs b/triple/src/transport/service.rs
index e470601..fdd6ecb 100644
--- a/triple/src/transport/service.rs
+++ b/triple/src/transport/service.rs
@@ -125,17 +125,21 @@ impl DubboServer {
             inner: self.services.get(&name).unwrap().clone(),
         };
 
+        let http2_keepalive_timeout = self
+            .http2_keepalive_timeout
+            .unwrap_or_else(|| Duration::new(60, 0));
+
         hyper::Server::bind(&addr)
             .http2_only(self.accept_http2)
             .http2_max_concurrent_streams(self.max_concurrent_streams)
             
.http2_initial_connection_window_size(self.init_connection_window_size)
             .http2_initial_stream_window_size(self.init_stream_window_size)
             .http2_keep_alive_interval(self.http2_keepalive_interval)
-            .http2_keep_alive_timeout(self.http2_keepalive_timeout.unwrap())
+            .http2_keep_alive_timeout(http2_keepalive_timeout)
             .http2_max_frame_size(self.max_frame_size)
             .serve(svc)
             .await
-            .map_err(|err| println!("Error: {:?}", err))
+            .map_err(|err| println!("hyper serve, Error: {:?}", err))
             .unwrap();
 
         Ok(())
@@ -170,8 +174,7 @@ where
 impl BusinessConfig for DubboServer {
     fn init() -> Self {
         let conf = config::get_global_config();
-        DubboServer::new()
-            
.with_accpet_http1(conf.bool("dubbo.server.accept_http2".to_string()))
+        
DubboServer::new().with_accpet_http1(conf.bool("dubbo.server.accept_http2".to_string()))
     }
 
     fn load() -> Result<(), std::convert::Infallible> {

Reply via email to