This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit fb37cf280af4baebc14ccaff922b0b1fcfade01a
Author: yangyang <[email protected]>
AuthorDate: Tue Jul 19 22:35:59 2022 +0800

    feat(triple): impl triple protocol based json serialization
---
 Cargo.toml                                      |   3 +-
 config/src/config.rs                            |  86 ++++++++++
 config/src/lib.rs                               |   4 +
 dubbo/Cargo.toml                                |  10 +-
 dubbo/src/common/url.rs                         |   2 +-
 dubbo/src/echo/echo_client.rs                   | 124 ++++++++++++++
 dubbo/src/echo/echo_server.rs                   | 188 +++++++++++++++++++++
 dubbo/src/echo/mod.rs                           | 214 ++++++++++++++++++++++++
 dubbo/src/init.rs                               | 110 ++++++++++++
 dubbo/src/lib.rs                                |   5 +
 dubbo/src/main.rs                               |   7 +
 dubbo/src/protocol/grpc/grpc_protocol.rs        |   2 +-
 dubbo/src/protocol/invocation.rs                |  44 +++++
 dubbo/src/protocol/mod.rs                       |  11 +-
 dubbo/src/protocol/triple/mod.rs                |  21 +++
 dubbo/src/protocol/triple/triple_exporter.rs    |  23 +++
 dubbo/src/protocol/triple/triple_invoker.rs     |  38 +++++
 dubbo/src/protocol/triple/triple_protocol.rs    |  48 ++++++
 dubbo/src/protocol/triple/triple_server.rs      |  44 +++++
 {dubbo => triple}/Cargo.toml                    |  18 +-
 triple/src/client/grpc.rs                       | 155 +++++++++++++++++
 dubbo/src/lib.rs => triple/src/client/mod.rs    |   7 +-
 triple/src/codec/buffer.rs                      | 138 +++++++++++++++
 triple/src/codec/mod.rs                         |  71 ++++++++
 triple/src/codec/serde_codec.rs                 |  89 ++++++++++
 triple/src/invocation.rs                        | 125 ++++++++++++++
 dubbo/src/main.rs => triple/src/lib.rs          |  22 ++-
 {dubbo => triple}/src/main.rs                   |  24 ++-
 dubbo/src/lib.rs => triple/src/server/consts.rs |  11 +-
 triple/src/server/decode.rs                     | 166 ++++++++++++++++++
 triple/src/server/encode.rs                     | 169 +++++++++++++++++++
 dubbo/src/main.rs => triple/src/server/mod.rs   |  15 +-
 triple/src/server/server.rs                     | 138 +++++++++++++++
 triple/src/server/service.rs                    |  68 ++++++++
 dubbo/src/lib.rs => triple/src/transport/mod.rs |   7 +-
 triple/src/transport/service.rs                 | 181 ++++++++++++++++++++
 36 files changed, 2343 insertions(+), 45 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 970aeb1..787689c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -5,5 +5,6 @@ members = [
   "metadata",
   "common",
   "config",
-  "dubbo"
+  "dubbo",
+  "triple"
 ]
diff --git a/config/src/config.rs b/config/src/config.rs
new file mode 100644
index 0000000..771c7ca
--- /dev/null
+++ b/config/src/config.rs
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{any, collections::HashMap};
+
+/// used to storage all structed config, from some source: cmd, file..;
+/// Impl Config trait, business init by read Config trait
+#[allow(dead_code)]
+pub struct RootConfig {
+    name: String,
+    data: HashMap<String, Box<dyn any::Any>>,
+}
+
+pub fn get_global_config() -> RootConfig {
+    RootConfig::new()
+}
+
+impl RootConfig {
+    pub fn new() -> Self {
+        Self {
+            name: "dubbo".to_string(),
+            data: HashMap::new(),
+        }
+    }
+
+    pub fn load(&mut self) {
+        // 通过环境变量读取某个文件。加在到内存中
+        self.data.insert(
+            "dubbo.provider.url".to_string(),
+            
Box::new("dubbo://127.0.0.1:8888/?serviceName=hellworld".to_string()),
+        );
+        // self.data.insert("dubbo.consume.", v)
+    }
+}
+
+impl Config for RootConfig {
+    fn bool(&self, key: String) -> bool {
+        match self.data.get(&key) {
+            None => false,
+            Some(val) => {
+                if let Some(v) = val.downcast_ref::<bool>() {
+                    return *v;
+                } else {
+                    false
+                }
+            }
+        }
+    }
+
+    fn string(&self, key: String) -> String {
+        match self.data.get(&key) {
+            None => "".to_string(),
+            Some(val) => {
+                if let Some(v) = val.downcast_ref::<String>() {
+                    return v.into();
+                } else {
+                    "".to_string()
+                }
+            }
+        }
+    }
+}
+
+pub trait BusinessConfig {
+    fn init() -> Self;
+    fn load() -> Result<(), std::convert::Infallible>;
+}
+
+pub trait Config {
+    fn bool(&self, key: String) -> bool;
+    fn string(&self, key: String) -> String;
+}
diff --git a/config/src/lib.rs b/config/src/lib.rs
index 3e01853..edbaf58 100644
--- a/config/src/lib.rs
+++ b/config/src/lib.rs
@@ -15,6 +15,10 @@
  * limitations under the License.
  */
 
+pub mod config;
+
+pub use config::*;
+
 #[cfg(test)]
 mod tests {
     #[test]
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 7ce6865..1a693ba 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -11,13 +11,14 @@ path = "src/helloworld/client.rs"
 
 [dependencies]
 h2 = {version = "0.3", optional = true}
-hyper = "0.14.19"
+hyper = { version = "0.14.19", features = ["full"]}
 http = "0.2"
 tonic = {version ="0.7.2", features = ["compression",]}
 tower-service = "0.3.1"
 http-body = "0.4.4"
 tower = "0.4.12"
 futures-util = {version = "0.3", default-features = false}
+futures-core = {version = "0.3", default-features = false}
 tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", 
"macros", "net", "signal"] }
 prost-derive = {version = "0.10", optional = true}
 prost = "0.10.4"
@@ -25,4 +26,11 @@ prost-types = { version = "0.6", default-features = false }
 lazy_static = "1.3.0"
 async-trait = "0.1.56"
 tower-layer = "0.3"
+bytes = "1.0"
+pin-project = "1.0"
+serde_json = "1.0.82"
+serde = {version="1.0.138", features = ["derive"]}
+tokio-stream = "0.1"
 
+config = {path = "../config"}
+triple = {path = "../triple"}
\ No newline at end of file
diff --git a/dubbo/src/common/url.rs b/dubbo/src/common/url.rs
index e610de3..5d7ce69 100644
--- a/dubbo/src/common/url.rs
+++ b/dubbo/src/common/url.rs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Default)]
 pub struct Url {
     pub url: String,
     pub service_key: String,
diff --git a/dubbo/src/echo/echo_client.rs b/dubbo/src/echo/echo_client.rs
new file mode 100644
index 0000000..8db5e89
--- /dev/null
+++ b/dubbo/src/echo/echo_client.rs
@@ -0,0 +1,124 @@
+use std::str::FromStr;
+
+use super::echo_server::{HelloReply, HelloRequest};
+use bytes::Buf;
+
+use triple::client::TripleClient;
+use triple::codec::serde_codec::SerdeCodec;
+use triple::invocation::*;
+use triple::server::Streaming;
+
+pub struct EchoClient {
+    inner: TripleClient,
+    uri: String,
+}
+
+impl EchoClient {
+    pub fn new() -> Self {
+        Self {
+            inner: TripleClient::new(),
+            uri: "".to_string(),
+        }
+    }
+
+    pub fn with_uri(mut self, uri: String) -> Self {
+        self.uri = uri;
+        self.inner = self
+            .inner
+            
.with_authority(http::uri::Authority::from_str(&self.uri).unwrap());
+        self
+    }
+
+    // pub async fn connect(&self, url: &str) {
+    //     self.inner.request(req)
+    // }
+
+    pub async fn bidirectional_streaming_echo(
+        mut self,
+        req: impl IntoStreamingRequest<Message = HelloRequest>,
+    ) -> Result<Response<Streaming<HelloReply>>, tonic::Status> {
+        let codec = SerdeCodec::<HelloRequest, HelloReply>::default();
+        self.inner
+            .bidi_streaming(
+                req,
+                codec,
+                http::uri::PathAndQuery::from_static("/bidi_stream"),
+            )
+            .await
+        // Stream trait to Body
+        // let mut codec = SerdeCodec::<HelloRequest, HelloReply>::default();
+        // let stream = req.into_streaming_request();
+        // let en = encode(codec.encoder(), stream.into_inner().map(Ok));
+        // let body = hyper::Body::wrap_stream(en);
+
+        // let req = http::Request::builder()
+        //     .version(Version::HTTP_2)
+        //     .uri(self.uri.clone() + "/bidi_stream")
+        //     .method("POST")
+        //     .body(body)
+        //     .unwrap();
+
+        // let response = self.inner.request(req).await;
+
+        // match response {
+        //     Ok(v) => {
+        //         println!("response: {:?}", v);
+        //         // println!("grpc status: {:?}", v)
+        //         let mut resp = v.map(|body| Streaming::new(body, 
codec.decoder()));
+        //         // TODO: rpc response to http response
+        //         let trailers_only_status = 
tonic::Status::from_header_map(resp.headers_mut());
+        //         println!("trailer only status: {:?}", trailers_only_status);
+
+        //         let (parts, mut body) = resp.into_parts();
+        //         let trailer = body.trailer().await.unwrap();
+        //         println!("trailer: {:?}", trailer);
+
+        //         // if let Some(trailer) = trailer.take() {
+        //         //     println!("trailer: {:?}", trailer);
+        //         // }
+        //         return Ok(Response::new(body));
+        //     }
+        //     Err(err) => {
+        //         println!("error: {}", err);
+        //         return Err(tonic::Status::new(tonic::Code::Internal, 
err.to_string()));
+        //     }
+        // }
+    }
+
+    pub async fn say_hello(
+        &self,
+        req: Request<HelloRequest>,
+    ) -> Result<Response<HelloReply>, tonic::Status> {
+        let (_parts, body) = req.into_parts();
+        let v = serde_json::to_vec(&body).unwrap();
+        let req = hyper::Request::builder()
+            .uri("http://".to_owned() + &self.uri.clone() + "/hello")
+            .method("POST")
+            .body(hyper::Body::from(v))
+            .unwrap();
+
+        println!("request: {:?}", req);
+        let response = 
hyper::Client::builder().build_http().request(req).await;
+
+        match response {
+            Ok(v) => {
+                println!("{:?}", v);
+                let (_parts, body) = v.into_parts();
+                let req_body = hyper::body::to_bytes(body).await.unwrap();
+                let v = req_body.chunk();
+                // let codec = SerdeCodec::<HelloReply, 
HelloRequest>::default();
+                let data: HelloReply = match serde_json::from_slice(v) {
+                    Ok(data) => data,
+                    Err(err) => {
+                        return Err(tonic::Status::new(tonic::Code::Internal, 
err.to_string()))
+                    }
+                };
+                Ok(Response::new(data))
+            }
+            Err(err) => {
+                println!("{}", err);
+                return Err(tonic::Status::new(tonic::Code::Internal, 
err.to_string()));
+            }
+        }
+    }
+}
diff --git a/dubbo/src/echo/echo_server.rs b/dubbo/src/echo/echo_server.rs
new file mode 100644
index 0000000..d6e9476
--- /dev/null
+++ b/dubbo/src/echo/echo_server.rs
@@ -0,0 +1,188 @@
+use async_trait::async_trait;
+
+use tonic::codegen::BoxFuture;
+use triple::codec::serde_codec::SerdeCodec;
+// 定义EchoServer
+// EchoServer 实现了自定义接口,同时可以处理请求分发
+use crate::protocol::triple::triple_invoker::TripleInvoker;
+use crate::protocol::DubboGrpcService;
+use crate::protocol::Invoker;
+use http_body::Body;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use serde::{Deserialize, Serialize};
+use std::task::Poll;
+use tower_service::Service;
+use triple::invocation::{Request, Response};
+use triple::server::server::TripleServer;
+use triple::server::service::{StreamingSvc, UnaryService};
+use triple::BoxBody;
+
+pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
+
+#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
+pub struct HelloRequest {
+    pub name: String,
+}
+
+#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
+pub struct HelloReply {
+    pub reply: String,
+}
+
+#[async_trait]
+pub trait Echo: Send + Sync + 'static {
+    async fn hello(
+        &self,
+        req: Request<HelloRequest>,
+    ) -> Result<Response<HelloReply>, tonic::Status>;
+
+    type BidirectionalStreamingEchoStream: futures_util::Stream<Item = 
Result<HelloReply, tonic::Status>>
+        + Send
+        + 'static;
+    /// BidirectionalStreamingEcho is bidi streaming.
+    async fn bidirectional_streaming_echo(
+        &self,
+        request: Request<triple::server::Streaming<HelloRequest>>,
+    ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, 
tonic::Status>;
+}
+
+struct _Inner<T>(Arc<T>);
+
+#[derive(Clone)]
+pub struct EchoServer<T, I = TripleInvoker> {
+    inner: _Inner<T>,
+    invoker: Option<I>,
+}
+
+impl<T, I> EchoServer<T, I> {
+    pub fn new(inner: T) -> Self {
+        Self {
+            inner: _Inner(Arc::new(inner)),
+            invoker: None,
+        }
+    }
+}
+
+impl<T: Echo, I, B> Service<http::Request<B>> for EchoServer<T, I>
+where
+    B: Body + Send + 'static,
+    B::Error: Into<StdError> + Debug + Send,
+    <B as Body>::Data: Send,
+    I: Invoker + Send,
+{
+    type Response = http::Response<BoxBody>;
+
+    type Error = std::convert::Infallible;
+
+    type Future = BoxFuture<Self::Response, Self::Error>;
+
+    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> 
Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, req: http::Request<B>) -> Self::Future {
+        let inner = self.inner.clone();
+        match req.uri().path() {
+            "/hello" => {
+                struct UnaryServer<T> {
+                    inner: _Inner<T>,
+                }
+
+                impl<T: Echo> UnaryService<HelloRequest> for UnaryServer<T> {
+                    type Response = HelloReply;
+
+                    type Future = BoxFuture<Response<Self::Response>, 
tonic::Status>;
+
+                    fn call(&mut self, req: Request<HelloRequest>) -> 
Self::Future {
+                        let inner = self.inner.0.clone();
+                        let fut = async move { inner.hello(req).await };
+                        Box::pin(fut)
+                    }
+                }
+
+                let fut = async move {
+                    let mut server =
+                        TripleServer::new(SerdeCodec::<HelloReply, 
HelloRequest>::default());
+                    let resp = server.unary(UnaryServer { inner }, req).await;
+                    Ok(resp)
+                };
+
+                Box::pin(fut)
+            }
+            "/bidi_stream" => {
+                struct StreamingServer<T> {
+                    inner: _Inner<T>,
+                }
+                impl<T: Echo> StreamingSvc<HelloRequest> for 
StreamingServer<T> {
+                    type Response = HelloReply;
+
+                    type ResponseStream = T::BidirectionalStreamingEchoStream;
+
+                    type Future = BoxFuture<Response<Self::ResponseStream>, 
tonic::Status>;
+
+                    fn call(
+                        &mut self,
+                        req: Request<triple::server::Streaming<HelloRequest>>,
+                    ) -> Self::Future {
+                        let inner = self.inner.0.clone();
+                        let fut = async move { 
inner.bidirectional_streaming_echo(req).await };
+                        Box::pin(fut)
+                    }
+                }
+
+                let fut = async move {
+                    let mut server =
+                        TripleServer::new(SerdeCodec::<HelloReply, 
HelloRequest>::default());
+                    let resp = server.bidi_streaming(StreamingServer { inner 
}, req).await;
+                    Ok(resp)
+                };
+
+                Box::pin(fut)
+            }
+            _ => {
+                Box::pin(async move {
+                    Ok(http::Response::builder()
+                        .status(200)
+                        .header("grpc-status", "12")
+                        .header("content-type", "application/grpc")
+                        // 
.body(hyper::Body::from("implement...").map_err(|err| match err {}).boxed())
+                        // 
.body(hyper::Body::from("implement...").map_err(|err| 
std::convert::Infallible).into())
+                        // .body(req.into_body())
+                        .body(
+                            http_body::Empty::new()
+                                .map_err(|err| match err {})
+                                .boxed_unsync(),
+                        )
+                        .unwrap())
+                })
+            }
+        }
+    }
+}
+
+impl<T, I> DubboGrpcService<I> for EchoServer<T, I>
+where
+    I: Invoker + Send + Sync + 'static,
+{
+    fn set_proxy_impl(&mut self, invoker: I) {
+        self.invoker = Some(invoker);
+    }
+
+    fn service_desc(&self) -> crate::protocol::server_desc::ServiceDesc {
+        todo!()
+    }
+}
+
+impl<T> Clone for _Inner<T> {
+    fn clone(&self) -> Self {
+        Self(self.0.clone())
+    }
+}
+
+impl<T: Debug> Debug for _Inner<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "Inner {:?}", self.0)
+    }
+}
diff --git a/dubbo/src/echo/mod.rs b/dubbo/src/echo/mod.rs
new file mode 100644
index 0000000..356fa9a
--- /dev/null
+++ b/dubbo/src/echo/mod.rs
@@ -0,0 +1,214 @@
+pub mod echo_client;
+pub mod echo_server;
+
+use futures_util::Stream;
+use futures_util::StreamExt;
+use std::io::ErrorKind;
+use std::pin::Pin;
+
+use tokio::sync::mpsc;
+
+use tokio_stream::wrappers::ReceiverStream;
+use tonic::async_trait;
+
+pub use self::echo_server::{Echo, EchoServer, HelloReply, HelloRequest};
+use triple::invocation::*;
+
+#[tokio::test]
+async fn test_client() {
+    use self::echo_client::EchoClient;
+    use self::echo_server::HelloRequest;
+    use futures_util::StreamExt;
+    use triple::invocation::*;
+
+    let cli = EchoClient::new().with_uri("127.0.0.1:8888".to_string());
+    let resp = cli
+        .say_hello(Request::new(HelloRequest {
+            name: "message from client".to_string(),
+        }))
+        .await;
+    let resp = match resp {
+        Ok(resp) => resp,
+        Err(err) => return println!("{:?}", err),
+    };
+    let (_parts, body) = resp.into_parts();
+    println!("Response: {:?}", body);
+
+    let data = vec![
+        HelloRequest {
+            name: "msg1 from client".to_string(),
+        },
+        HelloRequest {
+            name: "msg2 from client".to_string(),
+        },
+        HelloRequest {
+            name: "msg3 from client".to_string(),
+        },
+    ];
+    let req = futures_util::stream::iter(data);
+
+    let bidi_resp = cli.bidirectional_streaming_echo(req).await.unwrap();
+
+    let (_parts, mut body) = bidi_resp.into_parts();
+    // let trailer = body.trailer().await.unwrap();
+    // println!("trailer: {:?}", trailer);
+    while let Some(item) = body.next().await {
+        match item {
+            Ok(v) => {
+                println!("reply: {:?}", v);
+            }
+            Err(err) => {
+                println!("err: {:?}", err);
+            }
+        }
+    }
+    let trailer = body.trailer().await.unwrap();
+    println!("trailer: {:?}", trailer);
+}
+
+type ResponseStream = Pin<Box<dyn Stream<Item = Result<HelloReply, 
tonic::Status>> + Send>>;
+
+#[tokio::test]
+async fn test_server() {
+    use std::net::ToSocketAddrs;
+    use tokio::time::Duration;
+
+    let esi = EchoServer::<EchoServerImpl>::new(EchoServerImpl {
+        name: "echo server impl".to_string(),
+    });
+    // esi.set_proxy_impl(TripleInvoker);
+
+    let name = "echoServer".to_string();
+    println!("server listening, 0.0.0.0:8888");
+    triple::transport::DubboServer::new()
+        .add_service(name.clone(), esi)
+        .with_http2_keepalive_timeout(Duration::from_secs(60))
+        .serve(
+            name.clone(),
+            "0.0.0.0:8888".to_socket_addrs().unwrap().next().unwrap(),
+        )
+        .await
+        .unwrap();
+    // server.add_service(esi.into());
+}
+
+#[tokio::test]
+async fn test_triple_protocol() {
+    use crate::common::url::Url;
+    use crate::protocol::triple::triple_protocol::TripleProtocol;
+    use crate::protocol::Protocol;
+    use crate::utils::boxed_clone::BoxCloneService;
+
+    // crate::init::init();
+
+    let esi = EchoServer::<EchoServerImpl>::new(EchoServerImpl {
+        name: "echo".to_string(),
+    });
+
+    crate::protocol::triple::TRIPLE_SERVICES
+        .write()
+        .unwrap()
+        .insert("echo".to_string(), BoxCloneService::new(esi));
+
+    println!("triple server running, url: 0.0.0.0:8888");
+    let pro = TripleProtocol::new();
+    pro.export(Url {
+        url: "0.0.0.0:8888".to_string(),
+        service_key: "echo".to_string(),
+    })
+    .await;
+}
+
+#[allow(dead_code)]
+#[derive(Default, Clone)]
+struct EchoServerImpl {
+    name: String,
+}
+
+// #[async_trait]
+#[async_trait]
+impl Echo for EchoServerImpl {
+    async fn hello(
+        &self,
+        req: Request<HelloRequest>,
+    ) -> Result<Response<HelloReply>, tonic::Status> {
+        println!("EchoServer::hello {:?}", req.message);
+
+        Ok(Response::new(HelloReply {
+            reply: "hello, dubbo-rust".to_string(),
+        }))
+    }
+
+    type BidirectionalStreamingEchoStream = ResponseStream;
+
+    async fn bidirectional_streaming_echo(
+        &self,
+        request: Request<triple::server::Streaming<HelloRequest>>,
+    ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, 
tonic::Status> {
+        println!("EchoServer::bidirectional_streaming_echo");
+
+        let mut in_stream = request.into_inner();
+        let (tx, rx) = mpsc::channel(128);
+
+        // this spawn here is required if you want to handle connection error.
+        // If we just map `in_stream` and write it back as `out_stream` the 
`out_stream`
+        // will be drooped when connection error occurs and error will never 
be propagated
+        // to mapped version of `in_stream`.
+        tokio::spawn(async move {
+            while let Some(result) = in_stream.next().await {
+                match result {
+                    Ok(v) => {
+                        // if v.name.starts_with("msg2") {
+                        //     
tx.send(Err(tonic::Status::internal(format!("err: args is invalid, {:?}", 
v.name))
+                        //     )).await.expect("working rx");
+                        //     continue;
+                        // }
+                        tx.send(Ok(HelloReply {
+                            reply: format!("server reply: {:?}", v.name),
+                        }))
+                        .await
+                        .expect("working rx")
+                    }
+                    Err(err) => {
+                        if let Some(io_err) = match_for_io_error(&err) {
+                            if io_err.kind() == ErrorKind::BrokenPipe {
+                                // here you can handle special case when client
+                                // disconnected in unexpected way
+                                eprintln!("\tclient disconnected: broken 
pipe");
+                                break;
+                            }
+                        }
+
+                        match tx.send(Err(err)).await {
+                            Ok(_) => (),
+                            Err(_err) => break, // response was droped
+                        }
+                    }
+                }
+            }
+            println!("\tstream ended");
+        });
+
+        // echo just write the same data that was received
+        let out_stream = ReceiverStream::new(rx);
+
+        Ok(Response::new(
+            Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream
+        ))
+    }
+}
+
+fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
+    let mut err: &(dyn std::error::Error + 'static) = err_status;
+
+    loop {
+        if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
+            return Some(io_err);
+        }
+
+        err = match err.source() {
+            Some(err) => err,
+            None => return None,
+        };
+    }
+}
diff --git a/dubbo/src/init.rs b/dubbo/src/init.rs
new file mode 100644
index 0000000..e0d738c
--- /dev/null
+++ b/dubbo/src/init.rs
@@ -0,0 +1,110 @@
+// /// Server 启动的入口。业务侧需要调用该函数进行初始化
+// ///
+// use std::collections::HashMap;
+
+// use crate::common::url::Url;
+// use config::{BusinessConfig, RootConfig};
+
+// pub fn init() {
+//     let _root_config = RootConfig::new().load();
+//     let service_config = ServiceConfig::default()
+//         .group("helloworld".to_string())
+//         .serializer("json".to_string())
+//         .version("1.0".to_string())
+//         .name("echo".to_string());
+
+//     let triple_config = ProtocolConfig::default()
+//         .name("triple".to_string())
+//         .ip("0.0.0.0".to_string())
+//         .port("8888".to_string());
+
+//     let _service_config = 
service_config.add_protocol_configs(triple_config);
+//     // 根据不同的协议,加载不同的配置
+//     // 初始化全局的services
+//     // let server = DubboServer::init();
+//     // let server = server.add_service("echo".to_string(), service);
+// }
+
+// #[derive(Default)]
+// pub struct ServiceConfig {
+//     version: String,
+//     group: String,
+//     name: String,
+//     protocol_names: Vec<String>,
+//     registry_names: Vec<String>,
+//     serializer: String,
+//     protocol_configs: HashMap<String, ProtocolConfig>,
+// }
+
+// impl ServiceConfig {
+//     pub fn name(self, name: String) -> Self {
+//         Self { name, ..self }
+//     }
+
+//     pub fn version(self, version: String) -> Self {
+//         Self { version, ..self }
+//     }
+
+//     pub fn group(self, group: String) -> Self {
+//         Self { group, ..self }
+//     }
+
+//     pub fn protocol_names(self, protocol_names: Vec<String>) -> Self {
+//         Self {
+//             protocol_names,
+//             ..self
+//         }
+//     }
+
+//     pub fn serializer(self, serializer: String) -> Self {
+//         Self { serializer, ..self }
+//     }
+
+//     pub fn add_protocol_configs(mut self, protocol_config: ProtocolConfig) 
-> Self {
+//         self.protocol_configs
+//             .insert(protocol_config.name.clone(), protocol_config);
+//         Self { ..self }
+//     }
+
+//     pub fn get_url(&self) -> Vec<Url> {
+//         let mut urls = Vec::new();
+//         for (_, conf) in self.protocol_configs.iter() {
+//             urls.push(Url {
+//                 url: conf.to_owned().to_url(),
+//                 service_key: "".to_string(),
+//             });
+//         }
+
+//         urls
+//     }
+// }
+
+// #[derive(Default, Debug, Clone)]
+// pub struct ProtocolConfig {
+//     ip: String,
+//     port: String,
+//     name: String,
+//     params: HashMap<String, String>,
+// }
+
+// impl ProtocolConfig {
+//     pub fn name(self, name: String) -> Self {
+//         Self { name, ..self }
+//     }
+
+//     pub fn ip(self, ip: String) -> Self {
+//         Self { ip, ..self }
+//     }
+
+//     pub fn port(self, port: String) -> Self {
+//         Self { port, ..self }
+//     }
+
+//     pub fn params(self, params: HashMap<String, String>) -> Self {
+//         Self { params, ..self }
+//     }
+
+//     pub fn to_url(self) -> String {
+//         format!("{}://{}:{}", self.name, self.ip, self.port).to_string()
+//     }
+// }
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index e292e47..90ceb1c 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -16,6 +16,11 @@
  */
 
 pub mod common;
+pub mod echo;
 pub mod helloworld;
+pub mod init;
 pub mod protocol;
+pub mod registry;
 pub mod utils;
+
+pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
diff --git a/dubbo/src/main.rs b/dubbo/src/main.rs
index 906e5cb..db99336 100644
--- a/dubbo/src/main.rs
+++ b/dubbo/src/main.rs
@@ -16,11 +16,18 @@
  */
 
 pub mod common;
+pub mod echo;
 pub mod helloworld;
+pub mod init;
 pub mod protocol;
+pub mod registry;
 pub mod utils;
 
+pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
+
 #[tokio::main]
 async fn main() {
     println!("hello, dubbo-rust~")
 }
+
+// Refer service: 
tri://localhost:20000/helloworld.Greeter?app.version=3.0.0&application=dubbo.io&async=false&bean.name=GreeterClientImpl&cluster=failover&config.tracing=&environment=dev&generic=&group=&interface=helloworld.Greeter&loadbalance=&metadata-type=local&module=sample&name=dubbo.io&organization=dubbo-go&owner=dubbo-go&provided-by=&reference.filter=cshutdown&registry.role=0&release=dubbo-golang-3.0.0&retries=&serialization=&side=consumer&sticky=false&timestamp=1657865138&version=
diff --git a/dubbo/src/protocol/grpc/grpc_protocol.rs 
b/dubbo/src/protocol/grpc/grpc_protocol.rs
index b33468c..bf63a78 100644
--- a/dubbo/src/protocol/grpc/grpc_protocol.rs
+++ b/dubbo/src/protocol/grpc/grpc_protocol.rs
@@ -53,7 +53,7 @@ impl Protocol for GrpcProtocol {
         todo!()
     }
 
-    async fn refer(&self, url: Url) -> Self::Invoker {
+    async fn refer(self, url: Url) -> Self::Invoker {
         GrpcInvoker::new(url)
     }
 
diff --git a/dubbo/src/protocol/invocation.rs b/dubbo/src/protocol/invocation.rs
index a0686b2..50ca421 100644
--- a/dubbo/src/protocol/invocation.rs
+++ b/dubbo/src/protocol/invocation.rs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+use futures_core::Stream;
 use tonic::metadata::MetadataMap;
 
 pub struct Request<T> {
@@ -30,9 +31,21 @@ impl<T> Request<T> {
         }
     }
 
+    pub fn into_inner(self) -> T {
+        self.message
+    }
+
     pub fn into_parts(self) -> (MetadataMap, T) {
         (self.metadata, self.message)
     }
+
+    pub fn from_http(req: http::Request<T>) -> Self {
+        let (parts, body) = req.into_parts();
+        Request {
+            metadata: MetadataMap::from_headers(parts.headers),
+            message: body,
+        }
+    }
 }
 
 pub struct Response<T> {
@@ -55,4 +68,35 @@ impl<T> Response<T> {
     pub fn into_parts(self) -> (MetadataMap, T) {
         (self.metadata, self.message)
     }
+
+    pub fn map<F, U>(self, f: F) -> Response<U>
+    where
+        F: FnOnce(T) -> U,
+    {
+        let u = f(self.message);
+        Response {
+            message: u,
+            metadata: self.metadata,
+        }
+    }
+}
+
+pub trait IntoStreamingRequest {
+    type Stream: Stream<Item = Self::Message> + Send + 'static;
+    type Message;
+
+    fn into_streaming_request(self) -> Request<Self::Stream>;
+}
+
+impl<T> IntoStreamingRequest for T
+where
+    T: Stream + Send + 'static,
+{
+    type Stream = T;
+
+    type Message = T::Item;
+
+    fn into_streaming_request(self) -> Request<Self::Stream> {
+        Request::new(self)
+    }
 }
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 4199142..807c251 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -18,9 +18,12 @@
 pub mod grpc;
 pub mod invocation;
 pub mod server_desc;
+pub mod triple;
 
 use async_trait::async_trait;
 
+use crate::utils::boxed_clone::BoxCloneService;
+
 use crate::common::url::Url;
 
 #[async_trait]
@@ -30,7 +33,7 @@ pub trait Protocol {
 
     fn destroy(&self);
     async fn export(self, url: Url) -> Self::Exporter;
-    async fn refer(&self, url: Url) -> Self::Invoker;
+    async fn refer(self, url: Url) -> Self::Invoker;
 }
 
 pub trait Exporter {
@@ -53,3 +56,9 @@ pub trait DubboGrpcService<T> {
     fn set_proxy_impl(&mut self, invoker: T);
     fn service_desc(&self) -> server_desc::ServiceDesc;
 }
+
+pub type GrpcBoxCloneService = BoxCloneService<
+    http::Request<hyper::Body>,
+    http::Response<hyper::Body>,
+    std::convert::Infallible,
+>;
diff --git a/dubbo/src/protocol/triple/mod.rs b/dubbo/src/protocol/triple/mod.rs
new file mode 100644
index 0000000..32728c3
--- /dev/null
+++ b/dubbo/src/protocol/triple/mod.rs
@@ -0,0 +1,21 @@
+pub mod triple_exporter;
+pub mod triple_invoker;
+pub mod triple_protocol;
+pub mod triple_server;
+
+use lazy_static::lazy_static;
+use std::collections::HashMap;
+use std::sync::RwLock;
+
+use crate::utils::boxed_clone::BoxCloneService;
+use triple::BoxBody;
+
+pub type GrpcBoxCloneService =
+    BoxCloneService<http::Request<hyper::Body>, http::Response<BoxBody>, 
std::convert::Infallible>;
+
+lazy_static! {
+    // pub static ref DUBBO_GRPC_SERVICES: RwLock<HashMap<String, Box<dyn 
DubboGrpcService<GrpcInvoker> + Send + Sync + 'static>>> =
+    //     RwLock::new(HashMap::new());
+    pub static ref TRIPLE_SERVICES: RwLock<HashMap<String, 
GrpcBoxCloneService>> =
+        RwLock::new(HashMap::new());
+}
diff --git a/dubbo/src/protocol/triple/triple_exporter.rs 
b/dubbo/src/protocol/triple/triple_exporter.rs
new file mode 100644
index 0000000..5f66863
--- /dev/null
+++ b/dubbo/src/protocol/triple/triple_exporter.rs
@@ -0,0 +1,23 @@
+use super::triple_invoker::TripleInvoker;
+use crate::protocol::Exporter;
+
+#[derive(Clone)]
+pub struct TripleExporter {}
+
+impl TripleExporter {
+    pub fn new() -> Self {
+        TripleExporter {}
+    }
+}
+
+impl Exporter for TripleExporter {
+    type InvokerType = TripleInvoker;
+
+    fn unexport(&self) {
+        todo!()
+    }
+
+    fn get_invoker(&self) -> Self::InvokerType {
+        todo!()
+    }
+}
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs 
b/dubbo/src/protocol/triple/triple_invoker.rs
new file mode 100644
index 0000000..9f064ab
--- /dev/null
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -0,0 +1,38 @@
+use crate::common::url::Url;
+use crate::protocol::Invoker;
+
+#[allow(dead_code)]
+#[derive(Clone, Default)]
+pub struct TripleInvoker {
+    url: Url,
+}
+
+impl TripleInvoker {
+    pub fn new(url: Url) -> TripleInvoker {
+        Self { url }
+    }
+}
+
+impl Invoker for TripleInvoker {
+    fn invoke<M1>(
+        &self,
+        _req: crate::protocol::invocation::Request<M1>,
+    ) -> crate::protocol::invocation::Response<String>
+    where
+        M1: Send + 'static,
+    {
+        todo!()
+    }
+
+    fn is_available(&self) -> bool {
+        todo!()
+    }
+
+    fn destroy(&self) {
+        todo!()
+    }
+
+    fn get_url(&self) -> Url {
+        todo!()
+    }
+}
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs 
b/dubbo/src/protocol/triple/triple_protocol.rs
new file mode 100644
index 0000000..876f06c
--- /dev/null
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -0,0 +1,48 @@
+use std::collections::HashMap;
+
+use async_trait::async_trait;
+
+use super::triple_exporter::TripleExporter;
+use super::triple_invoker::TripleInvoker;
+use super::triple_server::TripleServer;
+use crate::common::url::Url;
+use crate::protocol::Protocol;
+
+pub struct TripleProtocol {
+    servers: HashMap<String, TripleServer>,
+}
+
+impl TripleProtocol {
+    pub fn new() -> Self {
+        TripleProtocol {
+            servers: HashMap::new(),
+        }
+    }
+
+    pub fn get_server(&self, name: String) -> Option<TripleServer> {
+        self.servers.get(&name).map(|data| data.to_owned())
+    }
+}
+
+#[async_trait]
+impl Protocol for TripleProtocol {
+    type Invoker = TripleInvoker;
+
+    type Exporter = TripleExporter;
+
+    fn destroy(&self) {
+        todo!()
+    }
+
+    async fn export(self, url: Url) -> Self::Exporter {
+        let server = TripleServer::new(url.service_key.clone());
+        server.serve(url.url.clone()).await;
+
+        TripleExporter::new()
+    }
+
+    async fn refer(self, url: Url) -> Self::Invoker {
+        TripleInvoker::new(url)
+        // Self::Invoker
+    }
+}
diff --git a/dubbo/src/protocol/triple/triple_server.rs 
b/dubbo/src/protocol/triple/triple_server.rs
new file mode 100644
index 0000000..b596bb9
--- /dev/null
+++ b/dubbo/src/protocol/triple/triple_server.rs
@@ -0,0 +1,44 @@
+use std::{net::ToSocketAddrs, str::FromStr};
+
+use triple::transport::DubboServer;
+
+#[derive(Default, Clone)]
+pub struct TripleServer {
+    s: DubboServer,
+    name: String,
+}
+
+impl TripleServer {
+    pub fn new(name: String) -> TripleServer {
+        Self {
+            name,
+            ..Default::default()
+        }
+    }
+
+    pub async fn serve(mut self, url: String) {
+        {
+            let lock = super::TRIPLE_SERVICES.read().unwrap();
+            let svc = lock.get(&self.name).unwrap();
+
+            self.s = self.s.add_service(self.name.clone(), svc.clone());
+        }
+
+        let uri = http::Uri::from_str(&url.clone()).unwrap();
+        let server = self.s.clone();
+
+        server
+            .serve(
+                self.name,
+                uri.authority()
+                    .unwrap()
+                    .to_string()
+                    .to_socket_addrs()
+                    .unwrap()
+                    .next()
+                    .unwrap(),
+            )
+            .await
+            .unwrap();
+    }
+}
diff --git a/dubbo/Cargo.toml b/triple/Cargo.toml
similarity index 66%
copy from dubbo/Cargo.toml
copy to triple/Cargo.toml
index 7ce6865..df17abd 100644
--- a/dubbo/Cargo.toml
+++ b/triple/Cargo.toml
@@ -1,23 +1,20 @@
 [package]
-name = "dubbo"
+name = "triple"
 version = "0.1.0"
 edition = "2021"
 
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
-[[bin]]
-name = "helloworld-client"
-path = "src/helloworld/client.rs"
-
 [dependencies]
 h2 = {version = "0.3", optional = true}
-hyper = "0.14.19"
+hyper = {version="0.14.19", features = ["full"]}
 http = "0.2"
 tonic = {version ="0.7.2", features = ["compression",]}
 tower-service = "0.3.1"
 http-body = "0.4.4"
 tower = "0.4.12"
 futures-util = {version = "0.3", default-features = false}
+futures-core = {version = "0.3", default-features = false}
 tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", 
"macros", "net", "signal"] }
 prost-derive = {version = "0.10", optional = true}
 prost = "0.10.4"
@@ -25,4 +22,13 @@ prost-types = { version = "0.6", default-features = false }
 lazy_static = "1.3.0"
 async-trait = "0.1.56"
 tower-layer = "0.3"
+pin-project = "1.0"
+axum = "0.5.9"
+bytes = "1.0"
+serde_json = "1.0.82"
+serde = {version="1.0.138", features = ["derive"]}
+async-stream = "0.3"
+tokio-stream = "0.1"
 
+# for test in codegen
+config = {path = "../config"}
\ No newline at end of file
diff --git a/triple/src/client/grpc.rs b/triple/src/client/grpc.rs
new file mode 100644
index 0000000..5126ed1
--- /dev/null
+++ b/triple/src/client/grpc.rs
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use futures_util::{StreamExt, TryStreamExt};
+use http::HeaderValue;
+
+use crate::codec::Codec;
+use crate::invocation::{IntoStreamingRequest, Response};
+use crate::server::encode::encode;
+use crate::server::Streaming;
+
+#[derive(Debug, Clone, Default)]
+pub struct TripleClient {
+    host: Option<http::uri::Authority>,
+    inner: ConnectionPool,
+}
+
+#[derive(Debug, Default, Clone)]
+pub struct ConnectionPool {
+    http2_only: bool,
+}
+
+impl ConnectionPool {
+    pub fn new() -> Self {
+        ConnectionPool { http2_only: true }
+    }
+
+    pub fn builder(self) -> hyper::Client<hyper::client::HttpConnector> {
+        hyper::Client::builder()
+            .http2_only(self.http2_only)
+            .build_http()
+    }
+}
+
+// TODO: initial connection pool
+impl TripleClient {
+    pub fn new() -> Self {
+        TripleClient {
+            host: None,
+            inner: ConnectionPool::new(),
+        }
+    }
+
+    pub fn with_authority(self, host: http::uri::Authority) -> Self {
+        TripleClient {
+            host: Some(host),
+            ..self
+        }
+    }
+}
+
+impl TripleClient {
+    pub async fn bidi_streaming<C, M1, M2>(
+        &mut self,
+        req: impl IntoStreamingRequest<Message = M1>,
+        mut codec: C,
+        path: http::uri::PathAndQuery,
+    ) -> Result<Response<Streaming<M2>>, tonic::Status>
+    where
+        C: Codec<Encode = M1, Decode = M2>,
+        M1: Send + Sync + 'static,
+        M2: Send + Sync + 'static,
+    {
+        let req = req.into_streaming_request();
+        let en = encode(codec.encoder(), 
req.into_inner().map(Ok)).into_stream();
+        let body = hyper::Body::wrap_stream(en);
+
+        let mut parts = http::uri::Parts::default();
+        parts.path_and_query = Some(path);
+        parts.authority = self.host.clone();
+        parts.scheme = Some(http::uri::Scheme::HTTP);
+
+        let uri = http::Uri::from_parts(parts).unwrap();
+
+        let mut req = hyper::Request::builder()
+            .version(http::Version::HTTP_2)
+            .uri(uri.clone())
+            .method("POST")
+            .body(body)
+            .unwrap();
+        *req.version_mut() = http::Version::HTTP_2;
+        req.headers_mut()
+            .insert("method", HeaderValue::from_static("POST"));
+        req.headers_mut().insert(
+            "scheme",
+            HeaderValue::from_str(uri.clone().scheme_str().unwrap()).unwrap(),
+        );
+        req.headers_mut()
+            .insert("path", 
HeaderValue::from_str(uri.clone().path()).unwrap());
+        req.headers_mut().insert(
+            "authority",
+            HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(),
+        );
+        req.headers_mut().insert(
+            "content-type",
+            HeaderValue::from_static("application/grpc+json"),
+        );
+        req.headers_mut()
+            .insert("user-agent", 
HeaderValue::from_static("dubbo-rust/1.0.0"));
+        req.headers_mut()
+            .insert("te", HeaderValue::from_static("trailers"));
+        req.headers_mut().insert(
+            "tri-service-version",
+            HeaderValue::from_static("dubbo-rust/1.0.0"),
+        );
+        req.headers_mut()
+            .insert("tri-service-group", HeaderValue::from_static("cluster"));
+        req.headers_mut().insert(
+            "tri-unit-info",
+            HeaderValue::from_static("dubbo-rust/1.0.0"),
+        );
+
+        // const (
+        //     TripleContentType    = "application/grpc+proto"
+        //     TripleUserAgent      = "grpc-go/1.35.0-dev"
+        //     TripleServiceVersion = "tri-service-version"
+        //     TripleAttachement    = "tri-attachment"
+        //     TripleServiceGroup   = "tri-service-group"
+        //     TripleRequestID      = "tri-req-id"
+        //     TripleTraceID        = "tri-trace-traceid"
+        //     TripleTraceRPCID     = "tri-trace-rpcid"
+        //     TripleTraceProtoBin  = "tri-trace-proto-bin"
+        //     TripleUnitInfo       = "tri-unit-info"
+        // )
+
+        let cli = self.inner.clone().builder();
+        let response = cli.request(req).await;
+
+        match response {
+            Ok(v) => {
+                let resp = v.map(|body| Streaming::new(body, codec.decoder()));
+
+                let (_parts, body) = resp.into_parts();
+                return Ok(Response::new(body));
+            }
+            Err(err) => {
+                return Err(tonic::Status::new(tonic::Code::Internal, 
err.to_string()));
+            }
+        }
+    }
+}
diff --git a/dubbo/src/lib.rs b/triple/src/client/mod.rs
similarity index 92%
copy from dubbo/src/lib.rs
copy to triple/src/client/mod.rs
index e292e47..5b2603e 100644
--- a/dubbo/src/lib.rs
+++ b/triple/src/client/mod.rs
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-pub mod common;
-pub mod helloworld;
-pub mod protocol;
-pub mod utils;
+pub mod grpc;
+
+pub use grpc::TripleClient;
diff --git a/triple/src/codec/buffer.rs b/triple/src/codec/buffer.rs
new file mode 100644
index 0000000..850f988
--- /dev/null
+++ b/triple/src/codec/buffer.rs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use bytes::buf::UninitSlice;
+use bytes::{Buf, BufMut, BytesMut};
+
+/// A specialized buffer to decode gRPC messages from.
+#[derive(Debug)]
+pub struct DecodeBuf<'a> {
+    buf: &'a mut BytesMut,
+    len: usize,
+}
+
+/// A specialized buffer to encode gRPC messages into.
+#[derive(Debug)]
+pub struct EncodeBuf<'a> {
+    buf: &'a mut BytesMut,
+}
+
+impl<'a> DecodeBuf<'a> {
+    pub fn new(buf: &'a mut BytesMut, len: usize) -> Self {
+        DecodeBuf { buf, len }
+    }
+}
+
+impl Buf for DecodeBuf<'_> {
+    #[inline]
+    fn remaining(&self) -> usize {
+        self.len
+    }
+
+    #[inline]
+    fn chunk(&self) -> &[u8] {
+        let ret = self.buf.chunk();
+
+        if ret.len() > self.len {
+            &ret[..self.len]
+        } else {
+            ret
+        }
+    }
+
+    #[inline]
+    fn advance(&mut self, cnt: usize) {
+        assert!(cnt <= self.len);
+        self.buf.advance(cnt);
+        self.len -= cnt;
+    }
+}
+
+impl<'a> EncodeBuf<'a> {
+    pub fn new(buf: &'a mut BytesMut) -> Self {
+        EncodeBuf { buf }
+    }
+}
+
+impl EncodeBuf<'_> {
+    /// Reserves capacity for at least `additional` more bytes to be inserted
+    /// into the buffer.
+    ///
+    /// More than `additional` bytes may be reserved in order to avoid frequent
+    /// reallocations. A call to `reserve` may result in an allocation.
+    #[inline]
+    pub fn reserve(&mut self, additional: usize) {
+        self.buf.reserve(additional);
+    }
+}
+
+unsafe impl BufMut for EncodeBuf<'_> {
+    #[inline]
+    fn remaining_mut(&self) -> usize {
+        self.buf.remaining_mut()
+    }
+
+    #[inline]
+    unsafe fn advance_mut(&mut self, cnt: usize) {
+        self.buf.advance_mut(cnt)
+    }
+
+    #[inline]
+    fn chunk_mut(&mut self) -> &mut UninitSlice {
+        self.buf.chunk_mut()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn decode_buf() {
+        let mut payload = BytesMut::with_capacity(100);
+        payload.put(&vec![0u8; 50][..]);
+        let mut buf = DecodeBuf::new(&mut payload, 20);
+
+        assert_eq!(buf.len, 20);
+        assert_eq!(buf.remaining(), 20);
+        assert_eq!(buf.chunk().len(), 20);
+
+        buf.advance(10);
+        assert_eq!(buf.remaining(), 10);
+
+        let mut out = [0; 5];
+        buf.copy_to_slice(&mut out);
+        assert_eq!(buf.remaining(), 5);
+        assert_eq!(buf.chunk().len(), 5);
+
+        assert_eq!(buf.copy_to_bytes(5).len(), 5);
+        assert!(!buf.has_remaining());
+    }
+
+    #[test]
+    fn encode_buf() {
+        let mut bytes = BytesMut::with_capacity(100);
+        let mut buf = EncodeBuf::new(&mut bytes);
+
+        let initial = buf.remaining_mut();
+        unsafe { buf.advance_mut(20) };
+        assert_eq!(buf.remaining_mut(), initial - 20);
+
+        buf.put_u8(b'a');
+        assert_eq!(buf.remaining_mut(), initial - 20 - 1);
+    }
+}
diff --git a/triple/src/codec/mod.rs b/triple/src/codec/mod.rs
new file mode 100644
index 0000000..f6e61d9
--- /dev/null
+++ b/triple/src/codec/mod.rs
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod buffer;
+pub mod serde_codec;
+
+use std::io;
+
+pub use self::buffer::{DecodeBuf, EncodeBuf};
+use tonic::Status;
+
+pub trait Codec {
+    /// The encodable message.
+    type Encode: Send + 'static;
+    /// The decodable message.
+    type Decode: Send + 'static;
+
+    /// The encoder that can encode a message.
+    type Encoder: Encoder<Item = Self::Encode, Error = Status> + Send + 
'static;
+    /// The encoder that can decode a message.
+    type Decoder: Decoder<Item = Self::Decode, Error = Status> + Send + 
'static;
+
+    /// Fetch the encoder.
+    fn encoder(&mut self) -> Self::Encoder;
+    /// Fetch the decoder.
+    fn decoder(&mut self) -> Self::Decoder;
+}
+
+/// Encodes gRPC message types
+pub trait Encoder {
+    /// The type that is encoded.
+    type Item;
+
+    /// The type of encoding errors.
+    ///
+    /// The type of unrecoverable frame encoding errors.
+    type Error: From<io::Error>;
+
+    /// Encodes a message into the provided buffer.
+    fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> 
Result<(), Self::Error>;
+}
+
+/// Decodes gRPC message types
+pub trait Decoder {
+    /// The type that is decoded.
+    type Item;
+
+    /// The type of unrecoverable frame decoding errors.
+    type Error: From<io::Error>;
+
+    /// Decode a message from the buffer.
+    ///
+    /// The buffer will contain exactly the bytes of a full message. There
+    /// is no need to get the length from the bytes, gRPC framing is handled
+    /// for you.
+    fn decode(&mut self, src: &mut DecodeBuf<'_>) -> 
Result<Option<Self::Item>, Self::Error>;
+}
diff --git a/triple/src/codec/serde_codec.rs b/triple/src/codec/serde_codec.rs
new file mode 100644
index 0000000..143eb56
--- /dev/null
+++ b/triple/src/codec/serde_codec.rs
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::marker::PhantomData;
+
+use bytes::{Buf, BufMut};
+use serde::{Deserialize, Serialize};
+
+use super::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder};
+
+#[derive(Debug)]
+pub struct SerdeCodec<T, U> {
+    _pd: PhantomData<(T, U)>,
+}
+
+impl<T, U> Default for SerdeCodec<T, U> {
+    fn default() -> Self {
+        Self { _pd: PhantomData }
+    }
+}
+
+impl<'a, T, U> Codec for SerdeCodec<T, U>
+where
+    T: Serialize + Send + 'static,
+    U: Deserialize<'a> + Send + 'static,
+{
+    type Encode = T;
+
+    type Decode = U;
+
+    type Encoder = SerdeEncoder<T>;
+
+    type Decoder = SerdeDecoder<U>;
+
+    fn encoder(&mut self) -> Self::Encoder {
+        SerdeEncoder(PhantomData)
+    }
+
+    fn decoder(&mut self) -> Self::Decoder {
+        SerdeDecoder(PhantomData)
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct SerdeEncoder<T>(PhantomData<T>);
+
+impl<T: Serialize> Encoder for SerdeEncoder<T> {
+    type Item = T;
+
+    type Error = tonic::Status;
+
+    fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> 
Result<(), Self::Error> {
+        item.serialize(&mut serde_json::Serializer::new(dst.writer()))
+            .expect("failed to searialize");
+
+        Ok(())
+    }
+}
+
+pub struct SerdeDecoder<U>(PhantomData<U>);
+
+impl<'a, U: Deserialize<'a>> Decoder for SerdeDecoder<U> {
+    type Item = U;
+
+    type Error = tonic::Status;
+
+    fn decode(&mut self, src: &mut DecodeBuf<'_>) -> 
Result<Option<Self::Item>, Self::Error> {
+        let value = src.chunk().to_owned();
+        let mut msg = vec![0u8; value.len()];
+        src.copy_to_slice(&mut msg);
+
+        let mut de = serde_json::Deserializer::from_reader(msg.reader());
+        Ok(Some(U::deserialize(&mut de).unwrap()))
+    }
+}
diff --git a/triple/src/invocation.rs b/triple/src/invocation.rs
new file mode 100644
index 0000000..1655312
--- /dev/null
+++ b/triple/src/invocation.rs
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use futures_core::Stream;
+use tonic::metadata::MetadataMap;
+
+pub struct Request<T> {
+    pub message: T,
+    pub metadata: MetadataMap,
+}
+
+impl<T> Request<T> {
+    pub fn new(message: T) -> Request<T> {
+        Self {
+            message,
+            metadata: MetadataMap::new(),
+        }
+    }
+
+    pub fn into_inner(self) -> T {
+        self.message
+    }
+
+    pub fn into_parts(self) -> (MetadataMap, T) {
+        (self.metadata, self.message)
+    }
+
+    pub fn from_http(req: http::Request<T>) -> Self {
+        let (parts, body) = req.into_parts();
+        Request {
+            metadata: MetadataMap::from_headers(parts.headers),
+            message: body,
+        }
+    }
+
+    pub fn into_http(self) -> http::Request<T> {
+        let mut http_req = http::Request::new(self.message);
+        *http_req.version_mut() = http::Version::HTTP_2;
+        *http_req.headers_mut() = self.metadata.into_headers();
+
+        http_req
+    }
+}
+
+pub struct Response<T> {
+    message: T,
+    metadata: MetadataMap,
+}
+
+impl<T> Response<T> {
+    pub fn new(message: T) -> Response<T> {
+        Self {
+            message,
+            metadata: MetadataMap::new(),
+        }
+    }
+
+    pub fn from_parts(metadata: MetadataMap, message: T) -> Self {
+        Self { message, metadata }
+    }
+
+    pub fn into_parts(self) -> (MetadataMap, T) {
+        (self.metadata, self.message)
+    }
+
+    pub fn into_http(self) -> http::Response<T> {
+        let mut http_resp = http::Response::new(self.message);
+        *http_resp.version_mut() = http::Version::HTTP_2;
+        *http_resp.headers_mut() = self.metadata.into_headers();
+
+        http_resp
+    }
+
+    pub fn map<F, U>(self, f: F) -> Response<U>
+    where
+        F: FnOnce(T) -> U,
+    {
+        let u = f(self.message);
+        Response {
+            message: u,
+            metadata: self.metadata,
+        }
+    }
+}
+
+pub trait IntoStreamingRequest {
+    type Stream: Stream<Item = Self::Message> + Send + 'static;
+    type Message;
+
+    fn into_streaming_request(self) -> Request<Self::Stream>;
+}
+
+impl<T> IntoStreamingRequest for T
+where
+    T: Stream + Send + 'static,
+    // T::Item: Result<Self::Message, std::convert::Infallible>,
+{
+    type Stream = T;
+
+    type Message = T::Item;
+
+    fn into_streaming_request(self) -> Request<Self::Stream> {
+        Request::new(self)
+    }
+}
+
+// impl<T> sealed::Sealed for T {}
+
+// pub mod sealed {
+//     pub trait Sealed {}
+// }
diff --git a/dubbo/src/main.rs b/triple/src/lib.rs
similarity index 67%
copy from dubbo/src/main.rs
copy to triple/src/lib.rs
index 906e5cb..42a3c6d 100644
--- a/dubbo/src/main.rs
+++ b/triple/src/lib.rs
@@ -15,12 +15,20 @@
  * limitations under the License.
  */
 
-pub mod common;
-pub mod helloworld;
-pub mod protocol;
-pub mod utils;
+pub mod client;
+pub mod codec;
+pub mod invocation;
+pub mod server;
+pub mod transport;
 
-#[tokio::main]
-async fn main() {
-    println!("hello, dubbo-rust~")
+use http_body::Body;
+
+pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
+
+pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, 
tonic::Status>;
+
+pub fn empty_body() -> BoxBody {
+    http_body::Empty::new()
+        .map_err(|err| match err {})
+        .boxed_unsync()
 }
diff --git a/dubbo/src/main.rs b/triple/src/main.rs
similarity index 65%
copy from dubbo/src/main.rs
copy to triple/src/main.rs
index 906e5cb..4350696 100644
--- a/dubbo/src/main.rs
+++ b/triple/src/main.rs
@@ -15,12 +15,22 @@
  * limitations under the License.
  */
 
-pub mod common;
-pub mod helloworld;
-pub mod protocol;
-pub mod utils;
+pub mod client;
+pub mod codec;
+pub mod invocation;
+pub mod server;
+pub mod transport;
 
-#[tokio::main]
-async fn main() {
-    println!("hello, dubbo-rust~")
+use http_body::Body;
+
+pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
+pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, 
tonic::Status>;
+pub fn empty_body() -> BoxBody {
+    http_body::Empty::new()
+        .map_err(|err| match err {})
+        .boxed_unsync()
+}
+
+fn main() {
+    println!("hello, triple");
 }
diff --git a/dubbo/src/lib.rs b/triple/src/server/consts.rs
similarity index 80%
copy from dubbo/src/lib.rs
copy to triple/src/server/consts.rs
index e292e47..4898211 100644
--- a/dubbo/src/lib.rs
+++ b/triple/src/server/consts.rs
@@ -15,7 +15,10 @@
  * limitations under the License.
  */
 
-pub mod common;
-pub mod helloworld;
-pub mod protocol;
-pub mod utils;
+pub const BUFFER_SIZE: usize = 1024 * 8;
+// 5 bytes
+pub const HEADER_SIZE: usize =
+    // compression flag
+    std::mem::size_of::<u8>() +
+    // data length
+    std::mem::size_of::<u32>();
diff --git a/triple/src/server/decode.rs b/triple/src/server/decode.rs
new file mode 100644
index 0000000..06ce3ae
--- /dev/null
+++ b/triple/src/server/decode.rs
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::{pin::Pin, task::Poll};
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use futures_util::Stream;
+use futures_util::{future, ready};
+use http_body::Body;
+use tonic::metadata::MetadataMap;
+
+use crate::codec::{DecodeBuf, Decoder};
+
+type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, tonic::Status>;
+
+pub struct Streaming<T> {
+    state: State,
+    body: BoxBody,
+    decoder: Box<dyn Decoder<Item = T, Error = tonic::Status> + Send + 
'static>,
+    buf: BytesMut,
+    trailers: Option<MetadataMap>,
+}
+
+#[derive(PartialEq)]
+enum State {
+    ReadHeader,
+    ReadBody { len: usize },
+    Error,
+}
+
+impl<T> Streaming<T> {
+    pub fn new<B, D>(body: B, decoder: D) -> Self
+    where
+        B: Body + Send + 'static,
+        B::Error: Into<crate::Error>,
+        D: Decoder<Item = T, Error = tonic::Status> + Send + 'static,
+    {
+        Self {
+            state: State::ReadHeader,
+            body: body
+                .map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
+                .map_err(|_err| tonic::Status::internal("err"))
+                .boxed_unsync(),
+            decoder: Box::new(decoder),
+            buf: BytesMut::with_capacity(super::consts::BUFFER_SIZE),
+            trailers: None,
+        }
+    }
+
+    pub async fn message(&mut self) -> Result<Option<T>, tonic::Status> {
+        match future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await {
+            Some(Ok(res)) => Ok(Some(res)),
+            Some(Err(err)) => Err(err),
+            None => Ok(None),
+        }
+    }
+
+    pub async fn trailer(&mut self) -> Result<Option<MetadataMap>, 
tonic::Status> {
+        if let Some(t) = self.trailers.take() {
+            return Ok(Some(t));
+        }
+        // while self.message().await?.is_some() {}
+
+        let trailer = future::poll_fn(|cx| Pin::new(&mut 
self.body).poll_trailers(cx)).await;
+        let trailer = trailer.map(|data| data.map(MetadataMap::from_headers));
+        trailer
+    }
+
+    pub fn decode_chunk(&mut self) -> Result<Option<T>, tonic::Status> {
+        if self.state == State::ReadHeader {
+            // buffer is full
+            if self.buf.remaining() < super::consts::HEADER_SIZE {
+                return Ok(None);
+            }
+
+            let _is_compressed = self.buf.get_u8();
+            let len = self.buf.get_u32() as usize;
+            self.buf.reserve(len as usize);
+
+            self.state = State::ReadBody { len }
+        }
+
+        if let State::ReadBody { len } = self.state {
+            if self.buf.remaining() < len || self.buf.len() < len {
+                return Ok(None);
+            }
+
+            let decoding_result = self.decoder.decode(&mut DecodeBuf::new(&mut 
self.buf, len));
+
+            return match decoding_result {
+                Ok(Some(r)) => {
+                    self.state = State::ReadHeader;
+                    Ok(Some(r))
+                }
+                Ok(None) => Ok(None),
+                Err(err) => Err(err),
+            };
+        }
+
+        Ok(None)
+    }
+}
+
+impl<T> Stream for Streaming<T> {
+    type Item = Result<T, tonic::Status>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        loop {
+            if self.state == State::Error {
+                return Poll::Ready(None);
+            }
+
+            if let Some(item) = self.decode_chunk()? {
+                return Poll::Ready(Some(Ok(item)));
+            }
+
+            let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
+                Some(Ok(d)) => Some(d),
+                Some(Err(e)) => {
+                    let _ = std::mem::replace(&mut self.state, State::Error);
+                    let err: crate::Error = e.into();
+                    return 
Poll::Ready(Some(Err(tonic::Status::internal(err.to_string()))));
+                }
+                None => None,
+            };
+
+            if let Some(data) = chunk {
+                self.buf.put(data)
+            } else {
+                break;
+            }
+        }
+
+        match ready!(Pin::new(&mut self.body).poll_trailers(cx)) {
+            Ok(trailer) => {
+                self.trailers = trailer.map(MetadataMap::from_headers);
+            }
+            Err(err) => {
+                println!("poll_trailers, err: {}", err);
+            }
+        }
+
+        Poll::Ready(None)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, None)
+    }
+}
diff --git a/triple/src/server/encode.rs b/triple/src/server/encode.rs
new file mode 100644
index 0000000..c76f448
--- /dev/null
+++ b/triple/src/server/encode.rs
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{pin::Pin, task::Poll};
+
+use bytes::{BufMut, Bytes, BytesMut};
+use futures_core::{Stream, TryStream};
+use futures_util::{ready, StreamExt, TryStreamExt};
+use http_body::Body;
+use pin_project::pin_project;
+use tonic::Status;
+
+use crate::codec::{EncodeBuf, Encoder};
+
+#[allow(unused_must_use)]
+pub fn encode<E, B>(mut encoder: E, resp_body: B) -> impl TryStream<Ok = 
Bytes, Error = Status>
+where
+    E: Encoder<Error = Status>,
+    B: Stream<Item = Result<E::Item, Status>>,
+{
+    async_stream::stream! {
+        let mut buf = BytesMut::with_capacity(super::consts::BUFFER_SIZE);
+        futures_util::pin_mut!(resp_body);
+
+        loop {
+            match resp_body.next().await {
+                Some(Ok(item)) => {
+                    // 编码数据到缓冲中
+                    buf.reserve(super::consts::HEADER_SIZE);
+                    unsafe {
+                        buf.advance_mut(super::consts::HEADER_SIZE);
+                    }
+                    encoder.encode(item, &mut EncodeBuf::new(&mut 
buf)).map_err(|_e| tonic::Status::internal("encode error"));
+
+                    let len = buf.len() - super::consts::HEADER_SIZE;
+                    {
+                        let mut buf = &mut buf[..super::consts::HEADER_SIZE];
+                        buf.put_u8(1);
+                        buf.put_u32(len as u32);
+                    }
+
+                    yield Ok(buf.split_to(len + 
super::consts::HEADER_SIZE).freeze());
+                },
+                Some(Err(err)) => yield Err(err.into()),
+                None => break,
+            }
+        }
+    }
+}
+
+pub fn encode_server<E, B>(
+    encoder: E,
+    body: B,
+) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
+where
+    E: Encoder<Error = Status>,
+    B: Stream<Item = Result<E::Item, Status>>,
+{
+    let s = encode(encoder, body).into_stream();
+    EncodeBody::new_server(s)
+}
+
+pub fn encode_client<E, B>(
+    encoder: E,
+    body: B,
+) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>>
+where
+    E: Encoder<Error = Status>,
+    B: Stream<Item = E::Item>,
+{
+    let s = encode(encoder, body.map(Ok)).into_stream();
+    EncodeBody::new_client(s)
+}
+
+#[derive(Debug)]
+enum Role {
+    Server,
+    Client,
+}
+#[pin_project]
+pub struct EncodeBody<S> {
+    #[pin]
+    inner: S,
+    role: Role,
+    is_end_stream: bool,
+    error: Option<tonic::Status>,
+}
+
+impl<S> EncodeBody<S> {
+    pub fn new_server(inner: S) -> Self {
+        Self {
+            inner,
+            role: Role::Server,
+            is_end_stream: false,
+            error: None,
+        }
+    }
+
+    pub fn new_client(inner: S) -> Self {
+        Self {
+            inner,
+            role: Role::Client,
+            is_end_stream: false,
+            error: None,
+        }
+    }
+}
+
+impl<S> Body for EncodeBody<S>
+where
+    S: Stream<Item = Result<Bytes, tonic::Status>>,
+{
+    type Data = Bytes;
+
+    type Error = tonic::Status;
+
+    fn is_end_stream(&self) -> bool {
+        self.is_end_stream
+    }
+
+    fn poll_data(
+        self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
+        let mut self_proj = self.project();
+        match ready!(self_proj.inner.try_poll_next_unpin(cx)) {
+            Some(Ok(d)) => Some(Ok(d)).into(),
+            Some(Err(status)) => {
+                *self_proj.error = Some(status);
+                None.into()
+            }
+            None => None.into(),
+        }
+    }
+
+    fn poll_trailers(
+        self: Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
+        let self_proj = self.project();
+        if *self_proj.is_end_stream {
+            return Poll::Ready(Ok(None));
+        }
+
+        let status = if let Some(status) = self_proj.error.take() {
+            *self_proj.is_end_stream = true;
+            status
+        } else {
+            tonic::Status::ok("")
+        };
+        let http = status.to_http();
+        println!("status: {:?}", http.headers().clone());
+
+        Poll::Ready(Ok(Some(http.headers().to_owned())))
+    }
+}
diff --git a/dubbo/src/main.rs b/triple/src/server/mod.rs
similarity index 84%
copy from dubbo/src/main.rs
copy to triple/src/server/mod.rs
index 906e5cb..8be7239 100644
--- a/dubbo/src/main.rs
+++ b/triple/src/server/mod.rs
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-pub mod common;
-pub mod helloworld;
-pub mod protocol;
-pub mod utils;
+pub mod consts;
+pub mod decode;
+pub mod encode;
+pub mod server;
+pub mod service;
 
-#[tokio::main]
-async fn main() {
-    println!("hello, dubbo-rust~")
-}
+pub use decode::Streaming;
+pub use encode::{encode, encode_server};
diff --git a/triple/src/server/server.rs b/triple/src/server/server.rs
new file mode 100644
index 0000000..6474d6c
--- /dev/null
+++ b/triple/src/server/server.rs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use bytes::{Buf, BytesMut};
+use http_body::Body;
+use std::fmt::Debug;
+
+use crate::codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder};
+use crate::invocation::Request;
+use crate::server::encode::encode_server;
+use crate::server::service::{StreamingSvc, UnaryService};
+use crate::server::Streaming;
+use crate::BoxBody;
+use config::BusinessConfig;
+
+pub struct TripleServer<T> {
+    codec: T,
+}
+
+impl<T> TripleServer<T> {
+    pub fn new(codec: T) -> Self {
+        Self { codec }
+    }
+}
+
+impl<T> TripleServer<T>
+where
+    T: Codec,
+{
+    pub async fn bidi_streaming<S, B>(
+        &mut self,
+        mut service: S,
+        req: http::Request<B>,
+    ) -> http::Response<BoxBody>
+    where
+        S: StreamingSvc<T::Decode, Response = T::Encode>,
+        S::ResponseStream: Send + 'static,
+        B: Body + Send + 'static,
+        B::Error: Into<crate::Error> + Send,
+    {
+        let req_stream = req.map(|body| Streaming::new(body, 
self.codec.decoder()));
+
+        let resp = service.call(Request::from_http(req_stream)).await;
+
+        let (mut parts, resp_body) = resp.unwrap().into_http().into_parts();
+        let resp_body = encode_server(self.codec.encoder(), resp_body);
+
+        parts.headers.insert(
+            http::header::CONTENT_TYPE,
+            http::HeaderValue::from_static("application/grpc"),
+        );
+        parts.status = http::StatusCode::OK;
+        http::Response::from_parts(parts, BoxBody::new(resp_body))
+    }
+
+    pub async fn unary<S, B>(
+        &mut self,
+        mut service: S,
+        req: http::Request<B>,
+    ) -> http::Response<BoxBody>
+    where
+        S: UnaryService<T::Decode, Response = T::Encode>,
+        B: Body + Send + 'static,
+        B::Error: Debug,
+    {
+        let (_parts, body) = req.into_parts();
+        let req_body = hyper::body::to_bytes(body).await.unwrap();
+        let v = req_body.chunk();
+        let mut req_byte = BytesMut::from(v);
+        let mut de = DecodeBuf::new(&mut req_byte, v.len());
+        let decoder = self
+            .codec
+            .decoder()
+            .decode(&mut de)
+            .map(|v| v.unwrap())
+            .unwrap();
+        let req = Request::new(decoder);
+
+        let resp = service.call(req).await;
+
+        let resp = match resp {
+            Ok(r) => r,
+            Err(status) => {
+                let (mut parts, _body) = http::Response::new(()).into_parts();
+                parts.headers.insert(
+                    http::header::CONTENT_TYPE,
+                    http::HeaderValue::from_static("application/grpc"),
+                );
+                parts.status = status.to_http().status();
+
+                return http::Response::from_parts(parts, crate::empty_body());
+            }
+        };
+        let (mut parts, body) = resp.into_http().into_parts();
+
+        // let data = hyper::body::aggregate(body)
+        // let b = body.size_hint();
+        let mut bytes = BytesMut::with_capacity(100);
+        let mut dst = EncodeBuf::new(&mut bytes);
+        let _res = self.codec.encoder().encode(body, &mut dst);
+        let data = bytes.to_vec();
+
+        let resp_body = hyper::Body::from(data);
+
+        parts.status = http::StatusCode::OK;
+        // http::Response::from_parts(parts, resp_body.map_err(|err| 
err.into()).boxed_unsync())
+        http::Response::from_parts(
+            parts,
+            resp_body
+                .map_err(|err| tonic::Status::new(tonic::Code::Internal, 
err.to_string()))
+                .boxed_unsync(),
+        )
+    }
+}
+
+impl<T> BusinessConfig for TripleServer<T> {
+    fn init() -> Self {
+        todo!()
+    }
+
+    fn load() -> Result<(), std::convert::Infallible> {
+        todo!()
+    }
+}
diff --git a/triple/src/server/service.rs b/triple/src/server/service.rs
new file mode 100644
index 0000000..cc2740c
--- /dev/null
+++ b/triple/src/server/service.rs
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use super::decode::Streaming;
+use crate::invocation::{Request, Response};
+use futures_util::{Future, Stream};
+use tower_service::Service;
+
+pub trait StreamingSvc<R> {
+    // proto response
+    type Response;
+    // the stream of proto message
+    type ResponseStream: Stream<Item = Result<Self::Response, tonic::Status>>;
+    // future of stream of proto message
+    type Future: Future<Output = Result<Response<Self::ResponseStream>, 
tonic::Status>>;
+
+    fn call(&mut self, req: Request<Streaming<R>>) -> Self::Future;
+}
+
+impl<T, S, M1, M2> StreamingSvc<M1> for T
+where
+    T: Service<Request<Streaming<M1>>, Response = Response<S>, Error = 
tonic::Status>,
+    S: Stream<Item = Result<M2, tonic::Status>>,
+{
+    type Response = M2;
+
+    type ResponseStream = S;
+
+    type Future = T::Future;
+
+    fn call(&mut self, req: Request<Streaming<M1>>) -> Self::Future {
+        Service::call(self, req)
+    }
+}
+
+pub trait UnaryService<R> {
+    type Response;
+    type Future: Future<Output = Result<Response<Self::Response>, 
tonic::Status>>;
+
+    fn call(&mut self, req: Request<R>) -> Self::Future;
+}
+
+impl<T, M1, M2> UnaryService<M1> for T
+where
+    T: Service<Request<M1>, Response = Response<M2>, Error = tonic::Status>,
+{
+    type Response = M2;
+
+    type Future = T::Future;
+
+    fn call(&mut self, req: Request<M1>) -> Self::Future {
+        T::call(self, req)
+    }
+}
diff --git a/dubbo/src/lib.rs b/triple/src/transport/mod.rs
similarity index 92%
copy from dubbo/src/lib.rs
copy to triple/src/transport/mod.rs
index e292e47..1f772a6 100644
--- a/dubbo/src/lib.rs
+++ b/triple/src/transport/mod.rs
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-pub mod common;
-pub mod helloworld;
-pub mod protocol;
-pub mod utils;
+pub mod service;
+
+pub use service::DubboServer;
diff --git a/triple/src/transport/service.rs b/triple/src/transport/service.rs
new file mode 100644
index 0000000..8001ef5
--- /dev/null
+++ b/triple/src/transport/service.rs
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::collections::HashMap;
+use std::future;
+use std::net::SocketAddr;
+use std::task::Poll;
+
+use http::{Request, Response};
+use hyper::{body::Body, server::conn::AddrStream};
+use tokio::time::Duration;
+use tower::ServiceExt;
+use tower_service::Service;
+
+use crate::BoxBody;
+use config::BusinessConfig;
+use config::Config;
+
+type BoxService = tower::util::BoxService<Request<Body>, Response<BoxBody>, 
crate::Error>;
+type BoxCloneService = tower::util::BoxCloneService<Request<Body>, 
Response<BoxBody>, crate::Error>;
+
+#[derive(Default, Clone)]
+pub struct DubboServer {
+    accept_http2: bool,
+    init_stream_window_size: Option<u32>,
+    init_connection_window_size: Option<u32>,
+    max_concurrent_streams: Option<u32>,
+    max_frame_size: Option<u32>,
+    http2_keepalive_interval: Option<Duration>,
+    http2_keepalive_timeout: Option<Duration>,
+    services: HashMap<String, BoxCloneService>,
+}
+
+impl DubboServer {
+    pub fn with_accpet_http1(self, accept_http2: bool) -> Self {
+        Self {
+            accept_http2: accept_http2,
+            ..self
+        }
+    }
+
+    pub fn with_init_stream_window_size(self, stream_window: u32) -> Self {
+        Self {
+            init_stream_window_size: Some(stream_window),
+            ..self
+        }
+    }
+
+    pub fn with_init_connection_window_size(self, connection_window: u32) -> 
Self {
+        Self {
+            init_connection_window_size: Some(connection_window),
+            ..self
+        }
+    }
+    pub fn with_max_concurrent_streams(self, concurrent_streams: u32) -> Self {
+        Self {
+            max_concurrent_streams: Some(concurrent_streams),
+            ..self
+        }
+    }
+    pub fn with_max_frame_size(self, max_frame_size: u32) -> Self {
+        Self {
+            max_frame_size: Some(max_frame_size),
+            ..self
+        }
+    }
+    pub fn with_http2_keepalive_interval(self, interval: Duration) -> Self {
+        Self {
+            http2_keepalive_interval: Some(interval),
+            ..self
+        }
+    }
+
+    pub fn with_http2_keepalive_timeout(self, timeout: Duration) -> Self {
+        Self {
+            http2_keepalive_timeout: Some(timeout),
+            ..self
+        }
+    }
+}
+
+impl DubboServer {
+    pub fn new() -> Self {
+        Self {
+            accept_http2: true,
+            init_stream_window_size: None,
+            init_connection_window_size: None,
+            max_concurrent_streams: None,
+            http2_keepalive_interval: None,
+            http2_keepalive_timeout: None,
+            max_frame_size: None,
+            services: HashMap::new(),
+        }
+    }
+}
+
+impl DubboServer {
+    pub fn add_service<S>(mut self, name: String, service: S) -> Self
+    where
+        S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send 
+ 'static,
+        S::Future: Send + 'static,
+        S::Error: Into<crate::Error> + Send + 'static,
+    {
+        self.services
+            .insert(name, service.map_err(|err| err.into()).boxed_clone());
+        Self { ..self }
+    }
+
+    pub async fn serve(self, name: String, addr: SocketAddr) -> Result<(), 
crate::Error> {
+        let svc = MakeSvc {
+            inner: self.services.get(&name).unwrap().clone(),
+        };
+
+        hyper::Server::bind(&addr)
+            .http2_only(self.accept_http2)
+            .http2_max_concurrent_streams(self.max_concurrent_streams)
+            
.http2_initial_connection_window_size(self.init_connection_window_size)
+            .http2_initial_stream_window_size(self.init_stream_window_size)
+            .http2_keep_alive_interval(self.http2_keepalive_interval)
+            .http2_keep_alive_timeout(self.http2_keepalive_timeout.unwrap())
+            .http2_max_frame_size(self.max_frame_size)
+            .serve(svc)
+            .await
+            .map_err(|err| println!("Error: {:?}", err))
+            .unwrap();
+
+        Ok(())
+    }
+}
+
+struct MakeSvc<S> {
+    inner: S,
+}
+
+impl<S> Service<&AddrStream> for MakeSvc<S>
+where
+    S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 
'static,
+    S::Future: Send + 'static,
+    S::Error: Into<crate::Error> + Send + 'static,
+{
+    type Response = BoxService;
+    type Error = crate::Error;
+    type Future = future::Ready<Result<Self::Response, Self::Error>>;
+
+    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> 
Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, _conn: &AddrStream) -> Self::Future {
+        let svc = self.inner.clone();
+        let s = svc.map_err(|err| err.into()).boxed();
+        future::ready(Ok(s))
+    }
+}
+
+impl BusinessConfig for DubboServer {
+    fn init() -> Self {
+        let conf = config::get_global_config();
+        let server = DubboServer::new()
+            
.with_accpet_http1(conf.bool("dubbo.server.accept_http2".to_string()));
+        server
+    }
+
+    fn load() -> Result<(), std::convert::Infallible> {
+        todo!()
+    }
+}

Reply via email to