This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit fb37cf280af4baebc14ccaff922b0b1fcfade01a Author: yangyang <[email protected]> AuthorDate: Tue Jul 19 22:35:59 2022 +0800 feat(triple): impl triple protocol based json serialization --- Cargo.toml | 3 +- config/src/config.rs | 86 ++++++++++ config/src/lib.rs | 4 + dubbo/Cargo.toml | 10 +- dubbo/src/common/url.rs | 2 +- dubbo/src/echo/echo_client.rs | 124 ++++++++++++++ dubbo/src/echo/echo_server.rs | 188 +++++++++++++++++++++ dubbo/src/echo/mod.rs | 214 ++++++++++++++++++++++++ dubbo/src/init.rs | 110 ++++++++++++ dubbo/src/lib.rs | 5 + dubbo/src/main.rs | 7 + dubbo/src/protocol/grpc/grpc_protocol.rs | 2 +- dubbo/src/protocol/invocation.rs | 44 +++++ dubbo/src/protocol/mod.rs | 11 +- dubbo/src/protocol/triple/mod.rs | 21 +++ dubbo/src/protocol/triple/triple_exporter.rs | 23 +++ dubbo/src/protocol/triple/triple_invoker.rs | 38 +++++ dubbo/src/protocol/triple/triple_protocol.rs | 48 ++++++ dubbo/src/protocol/triple/triple_server.rs | 44 +++++ {dubbo => triple}/Cargo.toml | 18 +- triple/src/client/grpc.rs | 155 +++++++++++++++++ dubbo/src/lib.rs => triple/src/client/mod.rs | 7 +- triple/src/codec/buffer.rs | 138 +++++++++++++++ triple/src/codec/mod.rs | 71 ++++++++ triple/src/codec/serde_codec.rs | 89 ++++++++++ triple/src/invocation.rs | 125 ++++++++++++++ dubbo/src/main.rs => triple/src/lib.rs | 22 ++- {dubbo => triple}/src/main.rs | 24 ++- dubbo/src/lib.rs => triple/src/server/consts.rs | 11 +- triple/src/server/decode.rs | 166 ++++++++++++++++++ triple/src/server/encode.rs | 169 +++++++++++++++++++ dubbo/src/main.rs => triple/src/server/mod.rs | 15 +- triple/src/server/server.rs | 138 +++++++++++++++ triple/src/server/service.rs | 68 ++++++++ dubbo/src/lib.rs => triple/src/transport/mod.rs | 7 +- triple/src/transport/service.rs | 181 ++++++++++++++++++++ 36 files changed, 2343 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 970aeb1..787689c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,6 @@ members = [ "metadata", "common", "config", - "dubbo" + "dubbo", + "triple" ] diff --git a/config/src/config.rs b/config/src/config.rs new file mode 100644 index 0000000..771c7ca --- /dev/null +++ b/config/src/config.rs @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{any, collections::HashMap}; + +/// used to storage all structed config, from some source: cmd, file..; +/// Impl Config trait, business init by read Config trait +#[allow(dead_code)] +pub struct RootConfig { + name: String, + data: HashMap<String, Box<dyn any::Any>>, +} + +pub fn get_global_config() -> RootConfig { + RootConfig::new() +} + +impl RootConfig { + pub fn new() -> Self { + Self { + name: "dubbo".to_string(), + data: HashMap::new(), + } + } + + pub fn load(&mut self) { + // 通过环境变量读取某个文件。加在到内存中 + self.data.insert( + "dubbo.provider.url".to_string(), + Box::new("dubbo://127.0.0.1:8888/?serviceName=hellworld".to_string()), + ); + // self.data.insert("dubbo.consume.", v) + } +} + +impl Config for RootConfig { + fn bool(&self, key: String) -> bool { + match self.data.get(&key) { + None => false, + Some(val) => { + if let Some(v) = val.downcast_ref::<bool>() { + return *v; + } else { + false + } + } + } + } + + fn string(&self, key: String) -> String { + match self.data.get(&key) { + None => "".to_string(), + Some(val) => { + if let Some(v) = val.downcast_ref::<String>() { + return v.into(); + } else { + "".to_string() + } + } + } + } +} + +pub trait BusinessConfig { + fn init() -> Self; + fn load() -> Result<(), std::convert::Infallible>; +} + +pub trait Config { + fn bool(&self, key: String) -> bool; + fn string(&self, key: String) -> String; +} diff --git a/config/src/lib.rs b/config/src/lib.rs index 3e01853..edbaf58 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -15,6 +15,10 @@ * limitations under the License. */ +pub mod config; + +pub use config::*; + #[cfg(test)] mod tests { #[test] diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index 7ce6865..1a693ba 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -11,13 +11,14 @@ path = "src/helloworld/client.rs" [dependencies] h2 = {version = "0.3", optional = true} -hyper = "0.14.19" +hyper = { version = "0.14.19", features = ["full"]} http = "0.2" tonic = {version ="0.7.2", features = ["compression",]} tower-service = "0.3.1" http-body = "0.4.4" tower = "0.4.12" futures-util = {version = "0.3", default-features = false} +futures-core = {version = "0.3", default-features = false} tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal"] } prost-derive = {version = "0.10", optional = true} prost = "0.10.4" @@ -25,4 +26,11 @@ prost-types = { version = "0.6", default-features = false } lazy_static = "1.3.0" async-trait = "0.1.56" tower-layer = "0.3" +bytes = "1.0" +pin-project = "1.0" +serde_json = "1.0.82" +serde = {version="1.0.138", features = ["derive"]} +tokio-stream = "0.1" +config = {path = "../config"} +triple = {path = "../triple"} \ No newline at end of file diff --git a/dubbo/src/common/url.rs b/dubbo/src/common/url.rs index e610de3..5d7ce69 100644 --- a/dubbo/src/common/url.rs +++ b/dubbo/src/common/url.rs @@ -15,7 +15,7 @@ * limitations under the License. */ -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Url { pub url: String, pub service_key: String, diff --git a/dubbo/src/echo/echo_client.rs b/dubbo/src/echo/echo_client.rs new file mode 100644 index 0000000..8db5e89 --- /dev/null +++ b/dubbo/src/echo/echo_client.rs @@ -0,0 +1,124 @@ +use std::str::FromStr; + +use super::echo_server::{HelloReply, HelloRequest}; +use bytes::Buf; + +use triple::client::TripleClient; +use triple::codec::serde_codec::SerdeCodec; +use triple::invocation::*; +use triple::server::Streaming; + +pub struct EchoClient { + inner: TripleClient, + uri: String, +} + +impl EchoClient { + pub fn new() -> Self { + Self { + inner: TripleClient::new(), + uri: "".to_string(), + } + } + + pub fn with_uri(mut self, uri: String) -> Self { + self.uri = uri; + self.inner = self + .inner + .with_authority(http::uri::Authority::from_str(&self.uri).unwrap()); + self + } + + // pub async fn connect(&self, url: &str) { + // self.inner.request(req) + // } + + pub async fn bidirectional_streaming_echo( + mut self, + req: impl IntoStreamingRequest<Message = HelloRequest>, + ) -> Result<Response<Streaming<HelloReply>>, tonic::Status> { + let codec = SerdeCodec::<HelloRequest, HelloReply>::default(); + self.inner + .bidi_streaming( + req, + codec, + http::uri::PathAndQuery::from_static("/bidi_stream"), + ) + .await + // Stream trait to Body + // let mut codec = SerdeCodec::<HelloRequest, HelloReply>::default(); + // let stream = req.into_streaming_request(); + // let en = encode(codec.encoder(), stream.into_inner().map(Ok)); + // let body = hyper::Body::wrap_stream(en); + + // let req = http::Request::builder() + // .version(Version::HTTP_2) + // .uri(self.uri.clone() + "/bidi_stream") + // .method("POST") + // .body(body) + // .unwrap(); + + // let response = self.inner.request(req).await; + + // match response { + // Ok(v) => { + // println!("response: {:?}", v); + // // println!("grpc status: {:?}", v) + // let mut resp = v.map(|body| Streaming::new(body, codec.decoder())); + // // TODO: rpc response to http response + // let trailers_only_status = tonic::Status::from_header_map(resp.headers_mut()); + // println!("trailer only status: {:?}", trailers_only_status); + + // let (parts, mut body) = resp.into_parts(); + // let trailer = body.trailer().await.unwrap(); + // println!("trailer: {:?}", trailer); + + // // if let Some(trailer) = trailer.take() { + // // println!("trailer: {:?}", trailer); + // // } + // return Ok(Response::new(body)); + // } + // Err(err) => { + // println!("error: {}", err); + // return Err(tonic::Status::new(tonic::Code::Internal, err.to_string())); + // } + // } + } + + pub async fn say_hello( + &self, + req: Request<HelloRequest>, + ) -> Result<Response<HelloReply>, tonic::Status> { + let (_parts, body) = req.into_parts(); + let v = serde_json::to_vec(&body).unwrap(); + let req = hyper::Request::builder() + .uri("http://".to_owned() + &self.uri.clone() + "/hello") + .method("POST") + .body(hyper::Body::from(v)) + .unwrap(); + + println!("request: {:?}", req); + let response = hyper::Client::builder().build_http().request(req).await; + + match response { + Ok(v) => { + println!("{:?}", v); + let (_parts, body) = v.into_parts(); + let req_body = hyper::body::to_bytes(body).await.unwrap(); + let v = req_body.chunk(); + // let codec = SerdeCodec::<HelloReply, HelloRequest>::default(); + let data: HelloReply = match serde_json::from_slice(v) { + Ok(data) => data, + Err(err) => { + return Err(tonic::Status::new(tonic::Code::Internal, err.to_string())) + } + }; + Ok(Response::new(data)) + } + Err(err) => { + println!("{}", err); + return Err(tonic::Status::new(tonic::Code::Internal, err.to_string())); + } + } + } +} diff --git a/dubbo/src/echo/echo_server.rs b/dubbo/src/echo/echo_server.rs new file mode 100644 index 0000000..d6e9476 --- /dev/null +++ b/dubbo/src/echo/echo_server.rs @@ -0,0 +1,188 @@ +use async_trait::async_trait; + +use tonic::codegen::BoxFuture; +use triple::codec::serde_codec::SerdeCodec; +// 定义EchoServer +// EchoServer 实现了自定义接口,同时可以处理请求分发 +use crate::protocol::triple::triple_invoker::TripleInvoker; +use crate::protocol::DubboGrpcService; +use crate::protocol::Invoker; +use http_body::Body; +use std::fmt::Debug; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use std::task::Poll; +use tower_service::Service; +use triple::invocation::{Request, Response}; +use triple::server::server::TripleServer; +use triple::server::service::{StreamingSvc, UnaryService}; +use triple::BoxBody; + +pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>; + +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +pub struct HelloRequest { + pub name: String, +} + +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +pub struct HelloReply { + pub reply: String, +} + +#[async_trait] +pub trait Echo: Send + Sync + 'static { + async fn hello( + &self, + req: Request<HelloRequest>, + ) -> Result<Response<HelloReply>, tonic::Status>; + + type BidirectionalStreamingEchoStream: futures_util::Stream<Item = Result<HelloReply, tonic::Status>> + + Send + + 'static; + /// BidirectionalStreamingEcho is bidi streaming. + async fn bidirectional_streaming_echo( + &self, + request: Request<triple::server::Streaming<HelloRequest>>, + ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, tonic::Status>; +} + +struct _Inner<T>(Arc<T>); + +#[derive(Clone)] +pub struct EchoServer<T, I = TripleInvoker> { + inner: _Inner<T>, + invoker: Option<I>, +} + +impl<T, I> EchoServer<T, I> { + pub fn new(inner: T) -> Self { + Self { + inner: _Inner(Arc::new(inner)), + invoker: None, + } + } +} + +impl<T: Echo, I, B> Service<http::Request<B>> for EchoServer<T, I> +where + B: Body + Send + 'static, + B::Error: Into<StdError> + Debug + Send, + <B as Body>::Data: Send, + I: Invoker + Send, +{ + type Response = http::Response<BoxBody>; + + type Error = std::convert::Infallible; + + type Future = BoxFuture<Self::Response, Self::Error>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: http::Request<B>) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/hello" => { + struct UnaryServer<T> { + inner: _Inner<T>, + } + + impl<T: Echo> UnaryService<HelloRequest> for UnaryServer<T> { + type Response = HelloReply; + + type Future = BoxFuture<Response<Self::Response>, tonic::Status>; + + fn call(&mut self, req: Request<HelloRequest>) -> Self::Future { + let inner = self.inner.0.clone(); + let fut = async move { inner.hello(req).await }; + Box::pin(fut) + } + } + + let fut = async move { + let mut server = + TripleServer::new(SerdeCodec::<HelloReply, HelloRequest>::default()); + let resp = server.unary(UnaryServer { inner }, req).await; + Ok(resp) + }; + + Box::pin(fut) + } + "/bidi_stream" => { + struct StreamingServer<T> { + inner: _Inner<T>, + } + impl<T: Echo> StreamingSvc<HelloRequest> for StreamingServer<T> { + type Response = HelloReply; + + type ResponseStream = T::BidirectionalStreamingEchoStream; + + type Future = BoxFuture<Response<Self::ResponseStream>, tonic::Status>; + + fn call( + &mut self, + req: Request<triple::server::Streaming<HelloRequest>>, + ) -> Self::Future { + let inner = self.inner.0.clone(); + let fut = async move { inner.bidirectional_streaming_echo(req).await }; + Box::pin(fut) + } + } + + let fut = async move { + let mut server = + TripleServer::new(SerdeCodec::<HelloReply, HelloRequest>::default()); + let resp = server.bidi_streaming(StreamingServer { inner }, req).await; + Ok(resp) + }; + + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + // .body(hyper::Body::from("implement...").map_err(|err| match err {}).boxed()) + // .body(hyper::Body::from("implement...").map_err(|err| std::convert::Infallible).into()) + // .body(req.into_body()) + .body( + http_body::Empty::new() + .map_err(|err| match err {}) + .boxed_unsync(), + ) + .unwrap()) + }) + } + } + } +} + +impl<T, I> DubboGrpcService<I> for EchoServer<T, I> +where + I: Invoker + Send + Sync + 'static, +{ + fn set_proxy_impl(&mut self, invoker: I) { + self.invoker = Some(invoker); + } + + fn service_desc(&self) -> crate::protocol::server_desc::ServiceDesc { + todo!() + } +} + +impl<T> Clone for _Inner<T> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl<T: Debug> Debug for _Inner<T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Inner {:?}", self.0) + } +} diff --git a/dubbo/src/echo/mod.rs b/dubbo/src/echo/mod.rs new file mode 100644 index 0000000..356fa9a --- /dev/null +++ b/dubbo/src/echo/mod.rs @@ -0,0 +1,214 @@ +pub mod echo_client; +pub mod echo_server; + +use futures_util::Stream; +use futures_util::StreamExt; +use std::io::ErrorKind; +use std::pin::Pin; + +use tokio::sync::mpsc; + +use tokio_stream::wrappers::ReceiverStream; +use tonic::async_trait; + +pub use self::echo_server::{Echo, EchoServer, HelloReply, HelloRequest}; +use triple::invocation::*; + +#[tokio::test] +async fn test_client() { + use self::echo_client::EchoClient; + use self::echo_server::HelloRequest; + use futures_util::StreamExt; + use triple::invocation::*; + + let cli = EchoClient::new().with_uri("127.0.0.1:8888".to_string()); + let resp = cli + .say_hello(Request::new(HelloRequest { + name: "message from client".to_string(), + })) + .await; + let resp = match resp { + Ok(resp) => resp, + Err(err) => return println!("{:?}", err), + }; + let (_parts, body) = resp.into_parts(); + println!("Response: {:?}", body); + + let data = vec![ + HelloRequest { + name: "msg1 from client".to_string(), + }, + HelloRequest { + name: "msg2 from client".to_string(), + }, + HelloRequest { + name: "msg3 from client".to_string(), + }, + ]; + let req = futures_util::stream::iter(data); + + let bidi_resp = cli.bidirectional_streaming_echo(req).await.unwrap(); + + let (_parts, mut body) = bidi_resp.into_parts(); + // let trailer = body.trailer().await.unwrap(); + // println!("trailer: {:?}", trailer); + while let Some(item) = body.next().await { + match item { + Ok(v) => { + println!("reply: {:?}", v); + } + Err(err) => { + println!("err: {:?}", err); + } + } + } + let trailer = body.trailer().await.unwrap(); + println!("trailer: {:?}", trailer); +} + +type ResponseStream = Pin<Box<dyn Stream<Item = Result<HelloReply, tonic::Status>> + Send>>; + +#[tokio::test] +async fn test_server() { + use std::net::ToSocketAddrs; + use tokio::time::Duration; + + let esi = EchoServer::<EchoServerImpl>::new(EchoServerImpl { + name: "echo server impl".to_string(), + }); + // esi.set_proxy_impl(TripleInvoker); + + let name = "echoServer".to_string(); + println!("server listening, 0.0.0.0:8888"); + triple::transport::DubboServer::new() + .add_service(name.clone(), esi) + .with_http2_keepalive_timeout(Duration::from_secs(60)) + .serve( + name.clone(), + "0.0.0.0:8888".to_socket_addrs().unwrap().next().unwrap(), + ) + .await + .unwrap(); + // server.add_service(esi.into()); +} + +#[tokio::test] +async fn test_triple_protocol() { + use crate::common::url::Url; + use crate::protocol::triple::triple_protocol::TripleProtocol; + use crate::protocol::Protocol; + use crate::utils::boxed_clone::BoxCloneService; + + // crate::init::init(); + + let esi = EchoServer::<EchoServerImpl>::new(EchoServerImpl { + name: "echo".to_string(), + }); + + crate::protocol::triple::TRIPLE_SERVICES + .write() + .unwrap() + .insert("echo".to_string(), BoxCloneService::new(esi)); + + println!("triple server running, url: 0.0.0.0:8888"); + let pro = TripleProtocol::new(); + pro.export(Url { + url: "0.0.0.0:8888".to_string(), + service_key: "echo".to_string(), + }) + .await; +} + +#[allow(dead_code)] +#[derive(Default, Clone)] +struct EchoServerImpl { + name: String, +} + +// #[async_trait] +#[async_trait] +impl Echo for EchoServerImpl { + async fn hello( + &self, + req: Request<HelloRequest>, + ) -> Result<Response<HelloReply>, tonic::Status> { + println!("EchoServer::hello {:?}", req.message); + + Ok(Response::new(HelloReply { + reply: "hello, dubbo-rust".to_string(), + })) + } + + type BidirectionalStreamingEchoStream = ResponseStream; + + async fn bidirectional_streaming_echo( + &self, + request: Request<triple::server::Streaming<HelloRequest>>, + ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, tonic::Status> { + println!("EchoServer::bidirectional_streaming_echo"); + + let mut in_stream = request.into_inner(); + let (tx, rx) = mpsc::channel(128); + + // this spawn here is required if you want to handle connection error. + // If we just map `in_stream` and write it back as `out_stream` the `out_stream` + // will be drooped when connection error occurs and error will never be propagated + // to mapped version of `in_stream`. + tokio::spawn(async move { + while let Some(result) = in_stream.next().await { + match result { + Ok(v) => { + // if v.name.starts_with("msg2") { + // tx.send(Err(tonic::Status::internal(format!("err: args is invalid, {:?}", v.name)) + // )).await.expect("working rx"); + // continue; + // } + tx.send(Ok(HelloReply { + reply: format!("server reply: {:?}", v.name), + })) + .await + .expect("working rx") + } + Err(err) => { + if let Some(io_err) = match_for_io_error(&err) { + if io_err.kind() == ErrorKind::BrokenPipe { + // here you can handle special case when client + // disconnected in unexpected way + eprintln!("\tclient disconnected: broken pipe"); + break; + } + } + + match tx.send(Err(err)).await { + Ok(_) => (), + Err(_err) => break, // response was droped + } + } + } + } + println!("\tstream ended"); + }); + + // echo just write the same data that was received + let out_stream = ReceiverStream::new(rx); + + Ok(Response::new( + Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream + )) + } +} + +fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> { + let mut err: &(dyn std::error::Error + 'static) = err_status; + + loop { + if let Some(io_err) = err.downcast_ref::<std::io::Error>() { + return Some(io_err); + } + + err = match err.source() { + Some(err) => err, + None => return None, + }; + } +} diff --git a/dubbo/src/init.rs b/dubbo/src/init.rs new file mode 100644 index 0000000..e0d738c --- /dev/null +++ b/dubbo/src/init.rs @@ -0,0 +1,110 @@ +// /// Server 启动的入口。业务侧需要调用该函数进行初始化 +// /// +// use std::collections::HashMap; + +// use crate::common::url::Url; +// use config::{BusinessConfig, RootConfig}; + +// pub fn init() { +// let _root_config = RootConfig::new().load(); +// let service_config = ServiceConfig::default() +// .group("helloworld".to_string()) +// .serializer("json".to_string()) +// .version("1.0".to_string()) +// .name("echo".to_string()); + +// let triple_config = ProtocolConfig::default() +// .name("triple".to_string()) +// .ip("0.0.0.0".to_string()) +// .port("8888".to_string()); + +// let _service_config = service_config.add_protocol_configs(triple_config); +// // 根据不同的协议,加载不同的配置 +// // 初始化全局的services +// // let server = DubboServer::init(); +// // let server = server.add_service("echo".to_string(), service); +// } + +// #[derive(Default)] +// pub struct ServiceConfig { +// version: String, +// group: String, +// name: String, +// protocol_names: Vec<String>, +// registry_names: Vec<String>, +// serializer: String, +// protocol_configs: HashMap<String, ProtocolConfig>, +// } + +// impl ServiceConfig { +// pub fn name(self, name: String) -> Self { +// Self { name, ..self } +// } + +// pub fn version(self, version: String) -> Self { +// Self { version, ..self } +// } + +// pub fn group(self, group: String) -> Self { +// Self { group, ..self } +// } + +// pub fn protocol_names(self, protocol_names: Vec<String>) -> Self { +// Self { +// protocol_names, +// ..self +// } +// } + +// pub fn serializer(self, serializer: String) -> Self { +// Self { serializer, ..self } +// } + +// pub fn add_protocol_configs(mut self, protocol_config: ProtocolConfig) -> Self { +// self.protocol_configs +// .insert(protocol_config.name.clone(), protocol_config); +// Self { ..self } +// } + +// pub fn get_url(&self) -> Vec<Url> { +// let mut urls = Vec::new(); +// for (_, conf) in self.protocol_configs.iter() { +// urls.push(Url { +// url: conf.to_owned().to_url(), +// service_key: "".to_string(), +// }); +// } + +// urls +// } +// } + +// #[derive(Default, Debug, Clone)] +// pub struct ProtocolConfig { +// ip: String, +// port: String, +// name: String, +// params: HashMap<String, String>, +// } + +// impl ProtocolConfig { +// pub fn name(self, name: String) -> Self { +// Self { name, ..self } +// } + +// pub fn ip(self, ip: String) -> Self { +// Self { ip, ..self } +// } + +// pub fn port(self, port: String) -> Self { +// Self { port, ..self } +// } + +// pub fn params(self, params: HashMap<String, String>) -> Self { +// Self { params, ..self } +// } + +// pub fn to_url(self) -> String { +// format!("{}://{}:{}", self.name, self.ip, self.port).to_string() +// } +// } diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs index e292e47..90ceb1c 100644 --- a/dubbo/src/lib.rs +++ b/dubbo/src/lib.rs @@ -16,6 +16,11 @@ */ pub mod common; +pub mod echo; pub mod helloworld; +pub mod init; pub mod protocol; +pub mod registry; pub mod utils; + +pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>; diff --git a/dubbo/src/main.rs b/dubbo/src/main.rs index 906e5cb..db99336 100644 --- a/dubbo/src/main.rs +++ b/dubbo/src/main.rs @@ -16,11 +16,18 @@ */ pub mod common; +pub mod echo; pub mod helloworld; +pub mod init; pub mod protocol; +pub mod registry; pub mod utils; +pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>; + #[tokio::main] async fn main() { println!("hello, dubbo-rust~") } + +// Refer service: tri://localhost:20000/helloworld.Greeter?app.version=3.0.0&application=dubbo.io&async=false&bean.name=GreeterClientImpl&cluster=failover&config.tracing=&environment=dev&generic=&group=&interface=helloworld.Greeter&loadbalance=&metadata-type=local&module=sample&name=dubbo.io&organization=dubbo-go&owner=dubbo-go&provided-by=&reference.filter=cshutdown®istry.role=0&release=dubbo-golang-3.0.0&retries=&serialization=&side=consumer&sticky=false×tamp=1657865138&version= diff --git a/dubbo/src/protocol/grpc/grpc_protocol.rs b/dubbo/src/protocol/grpc/grpc_protocol.rs index b33468c..bf63a78 100644 --- a/dubbo/src/protocol/grpc/grpc_protocol.rs +++ b/dubbo/src/protocol/grpc/grpc_protocol.rs @@ -53,7 +53,7 @@ impl Protocol for GrpcProtocol { todo!() } - async fn refer(&self, url: Url) -> Self::Invoker { + async fn refer(self, url: Url) -> Self::Invoker { GrpcInvoker::new(url) } diff --git a/dubbo/src/protocol/invocation.rs b/dubbo/src/protocol/invocation.rs index a0686b2..50ca421 100644 --- a/dubbo/src/protocol/invocation.rs +++ b/dubbo/src/protocol/invocation.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +use futures_core::Stream; use tonic::metadata::MetadataMap; pub struct Request<T> { @@ -30,9 +31,21 @@ impl<T> Request<T> { } } + pub fn into_inner(self) -> T { + self.message + } + pub fn into_parts(self) -> (MetadataMap, T) { (self.metadata, self.message) } + + pub fn from_http(req: http::Request<T>) -> Self { + let (parts, body) = req.into_parts(); + Request { + metadata: MetadataMap::from_headers(parts.headers), + message: body, + } + } } pub struct Response<T> { @@ -55,4 +68,35 @@ impl<T> Response<T> { pub fn into_parts(self) -> (MetadataMap, T) { (self.metadata, self.message) } + + pub fn map<F, U>(self, f: F) -> Response<U> + where + F: FnOnce(T) -> U, + { + let u = f(self.message); + Response { + message: u, + metadata: self.metadata, + } + } +} + +pub trait IntoStreamingRequest { + type Stream: Stream<Item = Self::Message> + Send + 'static; + type Message; + + fn into_streaming_request(self) -> Request<Self::Stream>; +} + +impl<T> IntoStreamingRequest for T +where + T: Stream + Send + 'static, +{ + type Stream = T; + + type Message = T::Item; + + fn into_streaming_request(self) -> Request<Self::Stream> { + Request::new(self) + } } diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 4199142..807c251 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -18,9 +18,12 @@ pub mod grpc; pub mod invocation; pub mod server_desc; +pub mod triple; use async_trait::async_trait; +use crate::utils::boxed_clone::BoxCloneService; + use crate::common::url::Url; #[async_trait] @@ -30,7 +33,7 @@ pub trait Protocol { fn destroy(&self); async fn export(self, url: Url) -> Self::Exporter; - async fn refer(&self, url: Url) -> Self::Invoker; + async fn refer(self, url: Url) -> Self::Invoker; } pub trait Exporter { @@ -53,3 +56,9 @@ pub trait DubboGrpcService<T> { fn set_proxy_impl(&mut self, invoker: T); fn service_desc(&self) -> server_desc::ServiceDesc; } + +pub type GrpcBoxCloneService = BoxCloneService< + http::Request<hyper::Body>, + http::Response<hyper::Body>, + std::convert::Infallible, +>; diff --git a/dubbo/src/protocol/triple/mod.rs b/dubbo/src/protocol/triple/mod.rs new file mode 100644 index 0000000..32728c3 --- /dev/null +++ b/dubbo/src/protocol/triple/mod.rs @@ -0,0 +1,21 @@ +pub mod triple_exporter; +pub mod triple_invoker; +pub mod triple_protocol; +pub mod triple_server; + +use lazy_static::lazy_static; +use std::collections::HashMap; +use std::sync::RwLock; + +use crate::utils::boxed_clone::BoxCloneService; +use triple::BoxBody; + +pub type GrpcBoxCloneService = + BoxCloneService<http::Request<hyper::Body>, http::Response<BoxBody>, std::convert::Infallible>; + +lazy_static! { + // pub static ref DUBBO_GRPC_SERVICES: RwLock<HashMap<String, Box<dyn DubboGrpcService<GrpcInvoker> + Send + Sync + 'static>>> = + // RwLock::new(HashMap::new()); + pub static ref TRIPLE_SERVICES: RwLock<HashMap<String, GrpcBoxCloneService>> = + RwLock::new(HashMap::new()); +} diff --git a/dubbo/src/protocol/triple/triple_exporter.rs b/dubbo/src/protocol/triple/triple_exporter.rs new file mode 100644 index 0000000..5f66863 --- /dev/null +++ b/dubbo/src/protocol/triple/triple_exporter.rs @@ -0,0 +1,23 @@ +use super::triple_invoker::TripleInvoker; +use crate::protocol::Exporter; + +#[derive(Clone)] +pub struct TripleExporter {} + +impl TripleExporter { + pub fn new() -> Self { + TripleExporter {} + } +} + +impl Exporter for TripleExporter { + type InvokerType = TripleInvoker; + + fn unexport(&self) { + todo!() + } + + fn get_invoker(&self) -> Self::InvokerType { + todo!() + } +} diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs new file mode 100644 index 0000000..9f064ab --- /dev/null +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -0,0 +1,38 @@ +use crate::common::url::Url; +use crate::protocol::Invoker; + +#[allow(dead_code)] +#[derive(Clone, Default)] +pub struct TripleInvoker { + url: Url, +} + +impl TripleInvoker { + pub fn new(url: Url) -> TripleInvoker { + Self { url } + } +} + +impl Invoker for TripleInvoker { + fn invoke<M1>( + &self, + _req: crate::protocol::invocation::Request<M1>, + ) -> crate::protocol::invocation::Response<String> + where + M1: Send + 'static, + { + todo!() + } + + fn is_available(&self) -> bool { + todo!() + } + + fn destroy(&self) { + todo!() + } + + fn get_url(&self) -> Url { + todo!() + } +} diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs new file mode 100644 index 0000000..876f06c --- /dev/null +++ b/dubbo/src/protocol/triple/triple_protocol.rs @@ -0,0 +1,48 @@ +use std::collections::HashMap; + +use async_trait::async_trait; + +use super::triple_exporter::TripleExporter; +use super::triple_invoker::TripleInvoker; +use super::triple_server::TripleServer; +use crate::common::url::Url; +use crate::protocol::Protocol; + +pub struct TripleProtocol { + servers: HashMap<String, TripleServer>, +} + +impl TripleProtocol { + pub fn new() -> Self { + TripleProtocol { + servers: HashMap::new(), + } + } + + pub fn get_server(&self, name: String) -> Option<TripleServer> { + self.servers.get(&name).map(|data| data.to_owned()) + } +} + +#[async_trait] +impl Protocol for TripleProtocol { + type Invoker = TripleInvoker; + + type Exporter = TripleExporter; + + fn destroy(&self) { + todo!() + } + + async fn export(self, url: Url) -> Self::Exporter { + let server = TripleServer::new(url.service_key.clone()); + server.serve(url.url.clone()).await; + + TripleExporter::new() + } + + async fn refer(self, url: Url) -> Self::Invoker { + TripleInvoker::new(url) + // Self::Invoker + } +} diff --git a/dubbo/src/protocol/triple/triple_server.rs b/dubbo/src/protocol/triple/triple_server.rs new file mode 100644 index 0000000..b596bb9 --- /dev/null +++ b/dubbo/src/protocol/triple/triple_server.rs @@ -0,0 +1,44 @@ +use std::{net::ToSocketAddrs, str::FromStr}; + +use triple::transport::DubboServer; + +#[derive(Default, Clone)] +pub struct TripleServer { + s: DubboServer, + name: String, +} + +impl TripleServer { + pub fn new(name: String) -> TripleServer { + Self { + name, + ..Default::default() + } + } + + pub async fn serve(mut self, url: String) { + { + let lock = super::TRIPLE_SERVICES.read().unwrap(); + let svc = lock.get(&self.name).unwrap(); + + self.s = self.s.add_service(self.name.clone(), svc.clone()); + } + + let uri = http::Uri::from_str(&url.clone()).unwrap(); + let server = self.s.clone(); + + server + .serve( + self.name, + uri.authority() + .unwrap() + .to_string() + .to_socket_addrs() + .unwrap() + .next() + .unwrap(), + ) + .await + .unwrap(); + } +} diff --git a/dubbo/Cargo.toml b/triple/Cargo.toml similarity index 66% copy from dubbo/Cargo.toml copy to triple/Cargo.toml index 7ce6865..df17abd 100644 --- a/dubbo/Cargo.toml +++ b/triple/Cargo.toml @@ -1,23 +1,20 @@ [package] -name = "dubbo" +name = "triple" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[[bin]] -name = "helloworld-client" -path = "src/helloworld/client.rs" - [dependencies] h2 = {version = "0.3", optional = true} -hyper = "0.14.19" +hyper = {version="0.14.19", features = ["full"]} http = "0.2" tonic = {version ="0.7.2", features = ["compression",]} tower-service = "0.3.1" http-body = "0.4.4" tower = "0.4.12" futures-util = {version = "0.3", default-features = false} +futures-core = {version = "0.3", default-features = false} tokio = { version = "1.0", features = [ "rt-multi-thread", "time", "fs", "macros", "net", "signal"] } prost-derive = {version = "0.10", optional = true} prost = "0.10.4" @@ -25,4 +22,13 @@ prost-types = { version = "0.6", default-features = false } lazy_static = "1.3.0" async-trait = "0.1.56" tower-layer = "0.3" +pin-project = "1.0" +axum = "0.5.9" +bytes = "1.0" +serde_json = "1.0.82" +serde = {version="1.0.138", features = ["derive"]} +async-stream = "0.3" +tokio-stream = "0.1" +# for test in codegen +config = {path = "../config"} \ No newline at end of file diff --git a/triple/src/client/grpc.rs b/triple/src/client/grpc.rs new file mode 100644 index 0000000..5126ed1 --- /dev/null +++ b/triple/src/client/grpc.rs @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use futures_util::{StreamExt, TryStreamExt}; +use http::HeaderValue; + +use crate::codec::Codec; +use crate::invocation::{IntoStreamingRequest, Response}; +use crate::server::encode::encode; +use crate::server::Streaming; + +#[derive(Debug, Clone, Default)] +pub struct TripleClient { + host: Option<http::uri::Authority>, + inner: ConnectionPool, +} + +#[derive(Debug, Default, Clone)] +pub struct ConnectionPool { + http2_only: bool, +} + +impl ConnectionPool { + pub fn new() -> Self { + ConnectionPool { http2_only: true } + } + + pub fn builder(self) -> hyper::Client<hyper::client::HttpConnector> { + hyper::Client::builder() + .http2_only(self.http2_only) + .build_http() + } +} + +// TODO: initial connection pool +impl TripleClient { + pub fn new() -> Self { + TripleClient { + host: None, + inner: ConnectionPool::new(), + } + } + + pub fn with_authority(self, host: http::uri::Authority) -> Self { + TripleClient { + host: Some(host), + ..self + } + } +} + +impl TripleClient { + pub async fn bidi_streaming<C, M1, M2>( + &mut self, + req: impl IntoStreamingRequest<Message = M1>, + mut codec: C, + path: http::uri::PathAndQuery, + ) -> Result<Response<Streaming<M2>>, tonic::Status> + where + C: Codec<Encode = M1, Decode = M2>, + M1: Send + Sync + 'static, + M2: Send + Sync + 'static, + { + let req = req.into_streaming_request(); + let en = encode(codec.encoder(), req.into_inner().map(Ok)).into_stream(); + let body = hyper::Body::wrap_stream(en); + + let mut parts = http::uri::Parts::default(); + parts.path_and_query = Some(path); + parts.authority = self.host.clone(); + parts.scheme = Some(http::uri::Scheme::HTTP); + + let uri = http::Uri::from_parts(parts).unwrap(); + + let mut req = hyper::Request::builder() + .version(http::Version::HTTP_2) + .uri(uri.clone()) + .method("POST") + .body(body) + .unwrap(); + *req.version_mut() = http::Version::HTTP_2; + req.headers_mut() + .insert("method", HeaderValue::from_static("POST")); + req.headers_mut().insert( + "scheme", + HeaderValue::from_str(uri.clone().scheme_str().unwrap()).unwrap(), + ); + req.headers_mut() + .insert("path", HeaderValue::from_str(uri.clone().path()).unwrap()); + req.headers_mut().insert( + "authority", + HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(), + ); + req.headers_mut().insert( + "content-type", + HeaderValue::from_static("application/grpc+json"), + ); + req.headers_mut() + .insert("user-agent", HeaderValue::from_static("dubbo-rust/1.0.0")); + req.headers_mut() + .insert("te", HeaderValue::from_static("trailers")); + req.headers_mut().insert( + "tri-service-version", + HeaderValue::from_static("dubbo-rust/1.0.0"), + ); + req.headers_mut() + .insert("tri-service-group", HeaderValue::from_static("cluster")); + req.headers_mut().insert( + "tri-unit-info", + HeaderValue::from_static("dubbo-rust/1.0.0"), + ); + + // const ( + // TripleContentType = "application/grpc+proto" + // TripleUserAgent = "grpc-go/1.35.0-dev" + // TripleServiceVersion = "tri-service-version" + // TripleAttachement = "tri-attachment" + // TripleServiceGroup = "tri-service-group" + // TripleRequestID = "tri-req-id" + // TripleTraceID = "tri-trace-traceid" + // TripleTraceRPCID = "tri-trace-rpcid" + // TripleTraceProtoBin = "tri-trace-proto-bin" + // TripleUnitInfo = "tri-unit-info" + // ) + + let cli = self.inner.clone().builder(); + let response = cli.request(req).await; + + match response { + Ok(v) => { + let resp = v.map(|body| Streaming::new(body, codec.decoder())); + + let (_parts, body) = resp.into_parts(); + return Ok(Response::new(body)); + } + Err(err) => { + return Err(tonic::Status::new(tonic::Code::Internal, err.to_string())); + } + } + } +} diff --git a/dubbo/src/lib.rs b/triple/src/client/mod.rs similarity index 92% copy from dubbo/src/lib.rs copy to triple/src/client/mod.rs index e292e47..5b2603e 100644 --- a/dubbo/src/lib.rs +++ b/triple/src/client/mod.rs @@ -15,7 +15,6 @@ * limitations under the License. */ -pub mod common; -pub mod helloworld; -pub mod protocol; -pub mod utils; +pub mod grpc; + +pub use grpc::TripleClient; diff --git a/triple/src/codec/buffer.rs b/triple/src/codec/buffer.rs new file mode 100644 index 0000000..850f988 --- /dev/null +++ b/triple/src/codec/buffer.rs @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use bytes::buf::UninitSlice; +use bytes::{Buf, BufMut, BytesMut}; + +/// A specialized buffer to decode gRPC messages from. +#[derive(Debug)] +pub struct DecodeBuf<'a> { + buf: &'a mut BytesMut, + len: usize, +} + +/// A specialized buffer to encode gRPC messages into. +#[derive(Debug)] +pub struct EncodeBuf<'a> { + buf: &'a mut BytesMut, +} + +impl<'a> DecodeBuf<'a> { + pub fn new(buf: &'a mut BytesMut, len: usize) -> Self { + DecodeBuf { buf, len } + } +} + +impl Buf for DecodeBuf<'_> { + #[inline] + fn remaining(&self) -> usize { + self.len + } + + #[inline] + fn chunk(&self) -> &[u8] { + let ret = self.buf.chunk(); + + if ret.len() > self.len { + &ret[..self.len] + } else { + ret + } + } + + #[inline] + fn advance(&mut self, cnt: usize) { + assert!(cnt <= self.len); + self.buf.advance(cnt); + self.len -= cnt; + } +} + +impl<'a> EncodeBuf<'a> { + pub fn new(buf: &'a mut BytesMut) -> Self { + EncodeBuf { buf } + } +} + +impl EncodeBuf<'_> { + /// Reserves capacity for at least `additional` more bytes to be inserted + /// into the buffer. + /// + /// More than `additional` bytes may be reserved in order to avoid frequent + /// reallocations. A call to `reserve` may result in an allocation. + #[inline] + pub fn reserve(&mut self, additional: usize) { + self.buf.reserve(additional); + } +} + +unsafe impl BufMut for EncodeBuf<'_> { + #[inline] + fn remaining_mut(&self) -> usize { + self.buf.remaining_mut() + } + + #[inline] + unsafe fn advance_mut(&mut self, cnt: usize) { + self.buf.advance_mut(cnt) + } + + #[inline] + fn chunk_mut(&mut self) -> &mut UninitSlice { + self.buf.chunk_mut() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn decode_buf() { + let mut payload = BytesMut::with_capacity(100); + payload.put(&vec![0u8; 50][..]); + let mut buf = DecodeBuf::new(&mut payload, 20); + + assert_eq!(buf.len, 20); + assert_eq!(buf.remaining(), 20); + assert_eq!(buf.chunk().len(), 20); + + buf.advance(10); + assert_eq!(buf.remaining(), 10); + + let mut out = [0; 5]; + buf.copy_to_slice(&mut out); + assert_eq!(buf.remaining(), 5); + assert_eq!(buf.chunk().len(), 5); + + assert_eq!(buf.copy_to_bytes(5).len(), 5); + assert!(!buf.has_remaining()); + } + + #[test] + fn encode_buf() { + let mut bytes = BytesMut::with_capacity(100); + let mut buf = EncodeBuf::new(&mut bytes); + + let initial = buf.remaining_mut(); + unsafe { buf.advance_mut(20) }; + assert_eq!(buf.remaining_mut(), initial - 20); + + buf.put_u8(b'a'); + assert_eq!(buf.remaining_mut(), initial - 20 - 1); + } +} diff --git a/triple/src/codec/mod.rs b/triple/src/codec/mod.rs new file mode 100644 index 0000000..f6e61d9 --- /dev/null +++ b/triple/src/codec/mod.rs @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub mod buffer; +pub mod serde_codec; + +use std::io; + +pub use self::buffer::{DecodeBuf, EncodeBuf}; +use tonic::Status; + +pub trait Codec { + /// The encodable message. + type Encode: Send + 'static; + /// The decodable message. + type Decode: Send + 'static; + + /// The encoder that can encode a message. + type Encoder: Encoder<Item = Self::Encode, Error = Status> + Send + 'static; + /// The encoder that can decode a message. + type Decoder: Decoder<Item = Self::Decode, Error = Status> + Send + 'static; + + /// Fetch the encoder. + fn encoder(&mut self) -> Self::Encoder; + /// Fetch the decoder. + fn decoder(&mut self) -> Self::Decoder; +} + +/// Encodes gRPC message types +pub trait Encoder { + /// The type that is encoded. + type Item; + + /// The type of encoding errors. + /// + /// The type of unrecoverable frame encoding errors. + type Error: From<io::Error>; + + /// Encodes a message into the provided buffer. + fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error>; +} + +/// Decodes gRPC message types +pub trait Decoder { + /// The type that is decoded. + type Item; + + /// The type of unrecoverable frame decoding errors. + type Error: From<io::Error>; + + /// Decode a message from the buffer. + /// + /// The buffer will contain exactly the bytes of a full message. There + /// is no need to get the length from the bytes, gRPC framing is handled + /// for you. + fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error>; +} diff --git a/triple/src/codec/serde_codec.rs b/triple/src/codec/serde_codec.rs new file mode 100644 index 0000000..143eb56 --- /dev/null +++ b/triple/src/codec/serde_codec.rs @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::marker::PhantomData; + +use bytes::{Buf, BufMut}; +use serde::{Deserialize, Serialize}; + +use super::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}; + +#[derive(Debug)] +pub struct SerdeCodec<T, U> { + _pd: PhantomData<(T, U)>, +} + +impl<T, U> Default for SerdeCodec<T, U> { + fn default() -> Self { + Self { _pd: PhantomData } + } +} + +impl<'a, T, U> Codec for SerdeCodec<T, U> +where + T: Serialize + Send + 'static, + U: Deserialize<'a> + Send + 'static, +{ + type Encode = T; + + type Decode = U; + + type Encoder = SerdeEncoder<T>; + + type Decoder = SerdeDecoder<U>; + + fn encoder(&mut self) -> Self::Encoder { + SerdeEncoder(PhantomData) + } + + fn decoder(&mut self) -> Self::Decoder { + SerdeDecoder(PhantomData) + } +} + +#[derive(Debug, Clone)] +pub struct SerdeEncoder<T>(PhantomData<T>); + +impl<T: Serialize> Encoder for SerdeEncoder<T> { + type Item = T; + + type Error = tonic::Status; + + fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { + item.serialize(&mut serde_json::Serializer::new(dst.writer())) + .expect("failed to searialize"); + + Ok(()) + } +} + +pub struct SerdeDecoder<U>(PhantomData<U>); + +impl<'a, U: Deserialize<'a>> Decoder for SerdeDecoder<U> { + type Item = U; + + type Error = tonic::Status; + + fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> { + let value = src.chunk().to_owned(); + let mut msg = vec![0u8; value.len()]; + src.copy_to_slice(&mut msg); + + let mut de = serde_json::Deserializer::from_reader(msg.reader()); + Ok(Some(U::deserialize(&mut de).unwrap())) + } +} diff --git a/triple/src/invocation.rs b/triple/src/invocation.rs new file mode 100644 index 0000000..1655312 --- /dev/null +++ b/triple/src/invocation.rs @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use futures_core::Stream; +use tonic::metadata::MetadataMap; + +pub struct Request<T> { + pub message: T, + pub metadata: MetadataMap, +} + +impl<T> Request<T> { + pub fn new(message: T) -> Request<T> { + Self { + message, + metadata: MetadataMap::new(), + } + } + + pub fn into_inner(self) -> T { + self.message + } + + pub fn into_parts(self) -> (MetadataMap, T) { + (self.metadata, self.message) + } + + pub fn from_http(req: http::Request<T>) -> Self { + let (parts, body) = req.into_parts(); + Request { + metadata: MetadataMap::from_headers(parts.headers), + message: body, + } + } + + pub fn into_http(self) -> http::Request<T> { + let mut http_req = http::Request::new(self.message); + *http_req.version_mut() = http::Version::HTTP_2; + *http_req.headers_mut() = self.metadata.into_headers(); + + http_req + } +} + +pub struct Response<T> { + message: T, + metadata: MetadataMap, +} + +impl<T> Response<T> { + pub fn new(message: T) -> Response<T> { + Self { + message, + metadata: MetadataMap::new(), + } + } + + pub fn from_parts(metadata: MetadataMap, message: T) -> Self { + Self { message, metadata } + } + + pub fn into_parts(self) -> (MetadataMap, T) { + (self.metadata, self.message) + } + + pub fn into_http(self) -> http::Response<T> { + let mut http_resp = http::Response::new(self.message); + *http_resp.version_mut() = http::Version::HTTP_2; + *http_resp.headers_mut() = self.metadata.into_headers(); + + http_resp + } + + pub fn map<F, U>(self, f: F) -> Response<U> + where + F: FnOnce(T) -> U, + { + let u = f(self.message); + Response { + message: u, + metadata: self.metadata, + } + } +} + +pub trait IntoStreamingRequest { + type Stream: Stream<Item = Self::Message> + Send + 'static; + type Message; + + fn into_streaming_request(self) -> Request<Self::Stream>; +} + +impl<T> IntoStreamingRequest for T +where + T: Stream + Send + 'static, + // T::Item: Result<Self::Message, std::convert::Infallible>, +{ + type Stream = T; + + type Message = T::Item; + + fn into_streaming_request(self) -> Request<Self::Stream> { + Request::new(self) + } +} + +// impl<T> sealed::Sealed for T {} + +// pub mod sealed { +// pub trait Sealed {} +// } diff --git a/dubbo/src/main.rs b/triple/src/lib.rs similarity index 67% copy from dubbo/src/main.rs copy to triple/src/lib.rs index 906e5cb..42a3c6d 100644 --- a/dubbo/src/main.rs +++ b/triple/src/lib.rs @@ -15,12 +15,20 @@ * limitations under the License. */ -pub mod common; -pub mod helloworld; -pub mod protocol; -pub mod utils; +pub mod client; +pub mod codec; +pub mod invocation; +pub mod server; +pub mod transport; -#[tokio::main] -async fn main() { - println!("hello, dubbo-rust~") +use http_body::Body; + +pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>; + +pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, tonic::Status>; + +pub fn empty_body() -> BoxBody { + http_body::Empty::new() + .map_err(|err| match err {}) + .boxed_unsync() } diff --git a/dubbo/src/main.rs b/triple/src/main.rs similarity index 65% copy from dubbo/src/main.rs copy to triple/src/main.rs index 906e5cb..4350696 100644 --- a/dubbo/src/main.rs +++ b/triple/src/main.rs @@ -15,12 +15,22 @@ * limitations under the License. */ -pub mod common; -pub mod helloworld; -pub mod protocol; -pub mod utils; +pub mod client; +pub mod codec; +pub mod invocation; +pub mod server; +pub mod transport; -#[tokio::main] -async fn main() { - println!("hello, dubbo-rust~") +use http_body::Body; + +pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>; +pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, tonic::Status>; +pub fn empty_body() -> BoxBody { + http_body::Empty::new() + .map_err(|err| match err {}) + .boxed_unsync() +} + +fn main() { + println!("hello, triple"); } diff --git a/dubbo/src/lib.rs b/triple/src/server/consts.rs similarity index 80% copy from dubbo/src/lib.rs copy to triple/src/server/consts.rs index e292e47..4898211 100644 --- a/dubbo/src/lib.rs +++ b/triple/src/server/consts.rs @@ -15,7 +15,10 @@ * limitations under the License. */ -pub mod common; -pub mod helloworld; -pub mod protocol; -pub mod utils; +pub const BUFFER_SIZE: usize = 1024 * 8; +// 5 bytes +pub const HEADER_SIZE: usize = + // compression flag + std::mem::size_of::<u8>() + + // data length + std::mem::size_of::<u32>(); diff --git a/triple/src/server/decode.rs b/triple/src/server/decode.rs new file mode 100644 index 0000000..06ce3ae --- /dev/null +++ b/triple/src/server/decode.rs @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{pin::Pin, task::Poll}; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use futures_util::Stream; +use futures_util::{future, ready}; +use http_body::Body; +use tonic::metadata::MetadataMap; + +use crate::codec::{DecodeBuf, Decoder}; + +type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, tonic::Status>; + +pub struct Streaming<T> { + state: State, + body: BoxBody, + decoder: Box<dyn Decoder<Item = T, Error = tonic::Status> + Send + 'static>, + buf: BytesMut, + trailers: Option<MetadataMap>, +} + +#[derive(PartialEq)] +enum State { + ReadHeader, + ReadBody { len: usize }, + Error, +} + +impl<T> Streaming<T> { + pub fn new<B, D>(body: B, decoder: D) -> Self + where + B: Body + Send + 'static, + B::Error: Into<crate::Error>, + D: Decoder<Item = T, Error = tonic::Status> + Send + 'static, + { + Self { + state: State::ReadHeader, + body: body + .map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) + .map_err(|_err| tonic::Status::internal("err")) + .boxed_unsync(), + decoder: Box::new(decoder), + buf: BytesMut::with_capacity(super::consts::BUFFER_SIZE), + trailers: None, + } + } + + pub async fn message(&mut self) -> Result<Option<T>, tonic::Status> { + match future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await { + Some(Ok(res)) => Ok(Some(res)), + Some(Err(err)) => Err(err), + None => Ok(None), + } + } + + pub async fn trailer(&mut self) -> Result<Option<MetadataMap>, tonic::Status> { + if let Some(t) = self.trailers.take() { + return Ok(Some(t)); + } + // while self.message().await?.is_some() {} + + let trailer = future::poll_fn(|cx| Pin::new(&mut self.body).poll_trailers(cx)).await; + let trailer = trailer.map(|data| data.map(MetadataMap::from_headers)); + trailer + } + + pub fn decode_chunk(&mut self) -> Result<Option<T>, tonic::Status> { + if self.state == State::ReadHeader { + // buffer is full + if self.buf.remaining() < super::consts::HEADER_SIZE { + return Ok(None); + } + + let _is_compressed = self.buf.get_u8(); + let len = self.buf.get_u32() as usize; + self.buf.reserve(len as usize); + + self.state = State::ReadBody { len } + } + + if let State::ReadBody { len } = self.state { + if self.buf.remaining() < len || self.buf.len() < len { + return Ok(None); + } + + let decoding_result = self.decoder.decode(&mut DecodeBuf::new(&mut self.buf, len)); + + return match decoding_result { + Ok(Some(r)) => { + self.state = State::ReadHeader; + Ok(Some(r)) + } + Ok(None) => Ok(None), + Err(err) => Err(err), + }; + } + + Ok(None) + } +} + +impl<T> Stream for Streaming<T> { + type Item = Result<T, tonic::Status>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<Self::Item>> { + loop { + if self.state == State::Error { + return Poll::Ready(None); + } + + if let Some(item) = self.decode_chunk()? { + return Poll::Ready(Some(Ok(item))); + } + + let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) { + Some(Ok(d)) => Some(d), + Some(Err(e)) => { + let _ = std::mem::replace(&mut self.state, State::Error); + let err: crate::Error = e.into(); + return Poll::Ready(Some(Err(tonic::Status::internal(err.to_string())))); + } + None => None, + }; + + if let Some(data) = chunk { + self.buf.put(data) + } else { + break; + } + } + + match ready!(Pin::new(&mut self.body).poll_trailers(cx)) { + Ok(trailer) => { + self.trailers = trailer.map(MetadataMap::from_headers); + } + Err(err) => { + println!("poll_trailers, err: {}", err); + } + } + + Poll::Ready(None) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, None) + } +} diff --git a/triple/src/server/encode.rs b/triple/src/server/encode.rs new file mode 100644 index 0000000..c76f448 --- /dev/null +++ b/triple/src/server/encode.rs @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::{pin::Pin, task::Poll}; + +use bytes::{BufMut, Bytes, BytesMut}; +use futures_core::{Stream, TryStream}; +use futures_util::{ready, StreamExt, TryStreamExt}; +use http_body::Body; +use pin_project::pin_project; +use tonic::Status; + +use crate::codec::{EncodeBuf, Encoder}; + +#[allow(unused_must_use)] +pub fn encode<E, B>(mut encoder: E, resp_body: B) -> impl TryStream<Ok = Bytes, Error = Status> +where + E: Encoder<Error = Status>, + B: Stream<Item = Result<E::Item, Status>>, +{ + async_stream::stream! { + let mut buf = BytesMut::with_capacity(super::consts::BUFFER_SIZE); + futures_util::pin_mut!(resp_body); + + loop { + match resp_body.next().await { + Some(Ok(item)) => { + // 编码数据到缓冲中 + buf.reserve(super::consts::HEADER_SIZE); + unsafe { + buf.advance_mut(super::consts::HEADER_SIZE); + } + encoder.encode(item, &mut EncodeBuf::new(&mut buf)).map_err(|_e| tonic::Status::internal("encode error")); + + let len = buf.len() - super::consts::HEADER_SIZE; + { + let mut buf = &mut buf[..super::consts::HEADER_SIZE]; + buf.put_u8(1); + buf.put_u32(len as u32); + } + + yield Ok(buf.split_to(len + super::consts::HEADER_SIZE).freeze()); + }, + Some(Err(err)) => yield Err(err.into()), + None => break, + } + } + } +} + +pub fn encode_server<E, B>( + encoder: E, + body: B, +) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>> +where + E: Encoder<Error = Status>, + B: Stream<Item = Result<E::Item, Status>>, +{ + let s = encode(encoder, body).into_stream(); + EncodeBody::new_server(s) +} + +pub fn encode_client<E, B>( + encoder: E, + body: B, +) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>> +where + E: Encoder<Error = Status>, + B: Stream<Item = E::Item>, +{ + let s = encode(encoder, body.map(Ok)).into_stream(); + EncodeBody::new_client(s) +} + +#[derive(Debug)] +enum Role { + Server, + Client, +} +#[pin_project] +pub struct EncodeBody<S> { + #[pin] + inner: S, + role: Role, + is_end_stream: bool, + error: Option<tonic::Status>, +} + +impl<S> EncodeBody<S> { + pub fn new_server(inner: S) -> Self { + Self { + inner, + role: Role::Server, + is_end_stream: false, + error: None, + } + } + + pub fn new_client(inner: S) -> Self { + Self { + inner, + role: Role::Client, + is_end_stream: false, + error: None, + } + } +} + +impl<S> Body for EncodeBody<S> +where + S: Stream<Item = Result<Bytes, tonic::Status>>, +{ + type Data = Bytes; + + type Error = tonic::Status; + + fn is_end_stream(&self) -> bool { + self.is_end_stream + } + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Option<Result<Self::Data, Self::Error>>> { + let mut self_proj = self.project(); + match ready!(self_proj.inner.try_poll_next_unpin(cx)) { + Some(Ok(d)) => Some(Ok(d)).into(), + Some(Err(status)) => { + *self_proj.error = Some(status); + None.into() + } + None => None.into(), + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { + let self_proj = self.project(); + if *self_proj.is_end_stream { + return Poll::Ready(Ok(None)); + } + + let status = if let Some(status) = self_proj.error.take() { + *self_proj.is_end_stream = true; + status + } else { + tonic::Status::ok("") + }; + let http = status.to_http(); + println!("status: {:?}", http.headers().clone()); + + Poll::Ready(Ok(Some(http.headers().to_owned()))) + } +} diff --git a/dubbo/src/main.rs b/triple/src/server/mod.rs similarity index 84% copy from dubbo/src/main.rs copy to triple/src/server/mod.rs index 906e5cb..8be7239 100644 --- a/dubbo/src/main.rs +++ b/triple/src/server/mod.rs @@ -15,12 +15,11 @@ * limitations under the License. */ -pub mod common; -pub mod helloworld; -pub mod protocol; -pub mod utils; +pub mod consts; +pub mod decode; +pub mod encode; +pub mod server; +pub mod service; -#[tokio::main] -async fn main() { - println!("hello, dubbo-rust~") -} +pub use decode::Streaming; +pub use encode::{encode, encode_server}; diff --git a/triple/src/server/server.rs b/triple/src/server/server.rs new file mode 100644 index 0000000..6474d6c --- /dev/null +++ b/triple/src/server/server.rs @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use bytes::{Buf, BytesMut}; +use http_body::Body; +use std::fmt::Debug; + +use crate::codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}; +use crate::invocation::Request; +use crate::server::encode::encode_server; +use crate::server::service::{StreamingSvc, UnaryService}; +use crate::server::Streaming; +use crate::BoxBody; +use config::BusinessConfig; + +pub struct TripleServer<T> { + codec: T, +} + +impl<T> TripleServer<T> { + pub fn new(codec: T) -> Self { + Self { codec } + } +} + +impl<T> TripleServer<T> +where + T: Codec, +{ + pub async fn bidi_streaming<S, B>( + &mut self, + mut service: S, + req: http::Request<B>, + ) -> http::Response<BoxBody> + where + S: StreamingSvc<T::Decode, Response = T::Encode>, + S::ResponseStream: Send + 'static, + B: Body + Send + 'static, + B::Error: Into<crate::Error> + Send, + { + let req_stream = req.map(|body| Streaming::new(body, self.codec.decoder())); + + let resp = service.call(Request::from_http(req_stream)).await; + + let (mut parts, resp_body) = resp.unwrap().into_http().into_parts(); + let resp_body = encode_server(self.codec.encoder(), resp_body); + + parts.headers.insert( + http::header::CONTENT_TYPE, + http::HeaderValue::from_static("application/grpc"), + ); + parts.status = http::StatusCode::OK; + http::Response::from_parts(parts, BoxBody::new(resp_body)) + } + + pub async fn unary<S, B>( + &mut self, + mut service: S, + req: http::Request<B>, + ) -> http::Response<BoxBody> + where + S: UnaryService<T::Decode, Response = T::Encode>, + B: Body + Send + 'static, + B::Error: Debug, + { + let (_parts, body) = req.into_parts(); + let req_body = hyper::body::to_bytes(body).await.unwrap(); + let v = req_body.chunk(); + let mut req_byte = BytesMut::from(v); + let mut de = DecodeBuf::new(&mut req_byte, v.len()); + let decoder = self + .codec + .decoder() + .decode(&mut de) + .map(|v| v.unwrap()) + .unwrap(); + let req = Request::new(decoder); + + let resp = service.call(req).await; + + let resp = match resp { + Ok(r) => r, + Err(status) => { + let (mut parts, _body) = http::Response::new(()).into_parts(); + parts.headers.insert( + http::header::CONTENT_TYPE, + http::HeaderValue::from_static("application/grpc"), + ); + parts.status = status.to_http().status(); + + return http::Response::from_parts(parts, crate::empty_body()); + } + }; + let (mut parts, body) = resp.into_http().into_parts(); + + // let data = hyper::body::aggregate(body) + // let b = body.size_hint(); + let mut bytes = BytesMut::with_capacity(100); + let mut dst = EncodeBuf::new(&mut bytes); + let _res = self.codec.encoder().encode(body, &mut dst); + let data = bytes.to_vec(); + + let resp_body = hyper::Body::from(data); + + parts.status = http::StatusCode::OK; + // http::Response::from_parts(parts, resp_body.map_err(|err| err.into()).boxed_unsync()) + http::Response::from_parts( + parts, + resp_body + .map_err(|err| tonic::Status::new(tonic::Code::Internal, err.to_string())) + .boxed_unsync(), + ) + } +} + +impl<T> BusinessConfig for TripleServer<T> { + fn init() -> Self { + todo!() + } + + fn load() -> Result<(), std::convert::Infallible> { + todo!() + } +} diff --git a/triple/src/server/service.rs b/triple/src/server/service.rs new file mode 100644 index 0000000..cc2740c --- /dev/null +++ b/triple/src/server/service.rs @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::decode::Streaming; +use crate::invocation::{Request, Response}; +use futures_util::{Future, Stream}; +use tower_service::Service; + +pub trait StreamingSvc<R> { + // proto response + type Response; + // the stream of proto message + type ResponseStream: Stream<Item = Result<Self::Response, tonic::Status>>; + // future of stream of proto message + type Future: Future<Output = Result<Response<Self::ResponseStream>, tonic::Status>>; + + fn call(&mut self, req: Request<Streaming<R>>) -> Self::Future; +} + +impl<T, S, M1, M2> StreamingSvc<M1> for T +where + T: Service<Request<Streaming<M1>>, Response = Response<S>, Error = tonic::Status>, + S: Stream<Item = Result<M2, tonic::Status>>, +{ + type Response = M2; + + type ResponseStream = S; + + type Future = T::Future; + + fn call(&mut self, req: Request<Streaming<M1>>) -> Self::Future { + Service::call(self, req) + } +} + +pub trait UnaryService<R> { + type Response; + type Future: Future<Output = Result<Response<Self::Response>, tonic::Status>>; + + fn call(&mut self, req: Request<R>) -> Self::Future; +} + +impl<T, M1, M2> UnaryService<M1> for T +where + T: Service<Request<M1>, Response = Response<M2>, Error = tonic::Status>, +{ + type Response = M2; + + type Future = T::Future; + + fn call(&mut self, req: Request<M1>) -> Self::Future { + T::call(self, req) + } +} diff --git a/dubbo/src/lib.rs b/triple/src/transport/mod.rs similarity index 92% copy from dubbo/src/lib.rs copy to triple/src/transport/mod.rs index e292e47..1f772a6 100644 --- a/dubbo/src/lib.rs +++ b/triple/src/transport/mod.rs @@ -15,7 +15,6 @@ * limitations under the License. */ -pub mod common; -pub mod helloworld; -pub mod protocol; -pub mod utils; +pub mod service; + +pub use service::DubboServer; diff --git a/triple/src/transport/service.rs b/triple/src/transport/service.rs new file mode 100644 index 0000000..8001ef5 --- /dev/null +++ b/triple/src/transport/service.rs @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashMap; +use std::future; +use std::net::SocketAddr; +use std::task::Poll; + +use http::{Request, Response}; +use hyper::{body::Body, server::conn::AddrStream}; +use tokio::time::Duration; +use tower::ServiceExt; +use tower_service::Service; + +use crate::BoxBody; +use config::BusinessConfig; +use config::Config; + +type BoxService = tower::util::BoxService<Request<Body>, Response<BoxBody>, crate::Error>; +type BoxCloneService = tower::util::BoxCloneService<Request<Body>, Response<BoxBody>, crate::Error>; + +#[derive(Default, Clone)] +pub struct DubboServer { + accept_http2: bool, + init_stream_window_size: Option<u32>, + init_connection_window_size: Option<u32>, + max_concurrent_streams: Option<u32>, + max_frame_size: Option<u32>, + http2_keepalive_interval: Option<Duration>, + http2_keepalive_timeout: Option<Duration>, + services: HashMap<String, BoxCloneService>, +} + +impl DubboServer { + pub fn with_accpet_http1(self, accept_http2: bool) -> Self { + Self { + accept_http2: accept_http2, + ..self + } + } + + pub fn with_init_stream_window_size(self, stream_window: u32) -> Self { + Self { + init_stream_window_size: Some(stream_window), + ..self + } + } + + pub fn with_init_connection_window_size(self, connection_window: u32) -> Self { + Self { + init_connection_window_size: Some(connection_window), + ..self + } + } + pub fn with_max_concurrent_streams(self, concurrent_streams: u32) -> Self { + Self { + max_concurrent_streams: Some(concurrent_streams), + ..self + } + } + pub fn with_max_frame_size(self, max_frame_size: u32) -> Self { + Self { + max_frame_size: Some(max_frame_size), + ..self + } + } + pub fn with_http2_keepalive_interval(self, interval: Duration) -> Self { + Self { + http2_keepalive_interval: Some(interval), + ..self + } + } + + pub fn with_http2_keepalive_timeout(self, timeout: Duration) -> Self { + Self { + http2_keepalive_timeout: Some(timeout), + ..self + } + } +} + +impl DubboServer { + pub fn new() -> Self { + Self { + accept_http2: true, + init_stream_window_size: None, + init_connection_window_size: None, + max_concurrent_streams: None, + http2_keepalive_interval: None, + http2_keepalive_timeout: None, + max_frame_size: None, + services: HashMap::new(), + } + } +} + +impl DubboServer { + pub fn add_service<S>(mut self, name: String, service: S) -> Self + where + S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into<crate::Error> + Send + 'static, + { + self.services + .insert(name, service.map_err(|err| err.into()).boxed_clone()); + Self { ..self } + } + + pub async fn serve(self, name: String, addr: SocketAddr) -> Result<(), crate::Error> { + let svc = MakeSvc { + inner: self.services.get(&name).unwrap().clone(), + }; + + hyper::Server::bind(&addr) + .http2_only(self.accept_http2) + .http2_max_concurrent_streams(self.max_concurrent_streams) + .http2_initial_connection_window_size(self.init_connection_window_size) + .http2_initial_stream_window_size(self.init_stream_window_size) + .http2_keep_alive_interval(self.http2_keepalive_interval) + .http2_keep_alive_timeout(self.http2_keepalive_timeout.unwrap()) + .http2_max_frame_size(self.max_frame_size) + .serve(svc) + .await + .map_err(|err| println!("Error: {:?}", err)) + .unwrap(); + + Ok(()) + } +} + +struct MakeSvc<S> { + inner: S, +} + +impl<S> Service<&AddrStream> for MakeSvc<S> +where + S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into<crate::Error> + Send + 'static, +{ + type Response = BoxService; + type Error = crate::Error; + type Future = future::Ready<Result<Self::Response, Self::Error>>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _conn: &AddrStream) -> Self::Future { + let svc = self.inner.clone(); + let s = svc.map_err(|err| err.into()).boxed(); + future::ready(Ok(s)) + } +} + +impl BusinessConfig for DubboServer { + fn init() -> Self { + let conf = config::get_global_config(); + let server = DubboServer::new() + .with_accpet_http1(conf.bool("dubbo.server.accept_http2".to_string())); + server + } + + fn load() -> Result<(), std::convert::Infallible> { + todo!() + } +}
