This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit b6e165f1ef744e204f78427132ee5f069f2b9506 Author: yangyang <[email protected]> AuthorDate: Thu Jul 21 21:36:44 2022 +0800 refactor(triple): add prost codec, start server by config --- config/src/config.rs | 26 +++- config/src/lib.rs | 2 + config/src/{lib.rs => protocol.rs} | 35 ++++- config/src/service.rs | 74 ++++++++++ dubbo/Cargo.toml | 1 + dubbo/src/echo/echo_client.rs | 59 +++----- dubbo/src/echo/echo_server.rs | 68 ++++++--- dubbo/src/echo/helloworld.rs | 296 +++++++++++++++++++++++++++++++++++++ dubbo/src/echo/mod.rs | 94 +++++++++--- dubbo/src/lib.rs | 8 +- dubbo/src/main.rs | 8 +- triple/readme.md | 15 ++ triple/src/client/grpc.rs | 108 ++++++++++---- triple/src/codec/mod.rs | 1 + triple/src/codec/prost.rs | 236 +++++++++++++++++++++++++++++ triple/src/invocation.rs | 23 +++ triple/src/server/encode.rs | 1 - triple/src/server/server.rs | 68 +++------ triple/src/transport/service.rs | 11 +- 19 files changed, 959 insertions(+), 175 deletions(-) diff --git a/config/src/config.rs b/config/src/config.rs index a53dcd2..1adf7d3 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -17,28 +17,48 @@ use std::{any, collections::HashMap}; +use super::protocol::ProtocolConfig; +use super::service::ServiceConfig; + /// used to storage all structed config, from some source: cmd, file..; /// Impl Config trait, business init by read Config trait #[allow(dead_code)] #[derive(Debug, Default)] pub struct RootConfig { - name: String, - data: HashMap<String, Box<dyn any::Any>>, + pub name: String, + pub service: ServiceConfig, + pub data: HashMap<String, Box<dyn any::Any>>, } pub fn get_global_config() -> RootConfig { - RootConfig::new() + let mut c = RootConfig::new(); + c.load(); + c } impl RootConfig { pub fn new() -> Self { Self { name: "dubbo".to_string(), + service: ServiceConfig::default(), data: HashMap::new(), } } pub fn load(&mut self) { + let service_config = ServiceConfig::default() + .group("test".to_string()) + .serializer("json".to_string()) + .version("1.0.0".to_string()) + .name("echo".to_string()); + + let triple_config = ProtocolConfig::default() + .name("triple".to_string()) + .ip("0.0.0.0".to_string()) + .port("8888".to_string()); + + let service_config = service_config.add_protocol_configs(triple_config); + self.service = service_config; // 通过环境变量读取某个文件。加在到内存中 self.data.insert( "dubbo.provider.url".to_string(), diff --git a/config/src/lib.rs b/config/src/lib.rs index edbaf58..6237816 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -16,6 +16,8 @@ */ pub mod config; +pub mod protocol; +pub mod service; pub use config::*; diff --git a/config/src/lib.rs b/config/src/protocol.rs similarity index 54% copy from config/src/lib.rs copy to config/src/protocol.rs index edbaf58..d139bed 100644 --- a/config/src/lib.rs +++ b/config/src/protocol.rs @@ -15,15 +15,34 @@ * limitations under the License. */ -pub mod config; +use std::collections::HashMap; -pub use config::*; +#[derive(Default, Debug, Clone)] +pub struct ProtocolConfig { + pub ip: String, + pub port: String, + pub name: String, + pub params: HashMap<String, String>, +} + +impl ProtocolConfig { + pub fn name(self, name: String) -> Self { + Self { name, ..self } + } + + pub fn ip(self, ip: String) -> Self { + Self { ip, ..self } + } + + pub fn port(self, port: String) -> Self { + Self { port, ..self } + } + + pub fn params(self, params: HashMap<String, String>) -> Self { + Self { params, ..self } + } -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - let result = 2 + 2; - assert_eq!(result, 4); + pub fn to_url(self) -> String { + format!("{}://{}:{}", self.name, self.ip, self.port) } } diff --git a/config/src/service.rs b/config/src/service.rs new file mode 100644 index 0000000..aeda24b --- /dev/null +++ b/config/src/service.rs @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashMap; + +use super::protocol::ProtocolConfig; + +#[derive(Debug, Default)] +pub struct ServiceConfig { + pub version: String, + pub group: String, + pub name: String, + pub protocol_names: Vec<String>, + pub registry_names: Vec<String>, + pub serializer: String, + pub protocol_configs: HashMap<String, ProtocolConfig>, +} + +impl ServiceConfig { + pub fn name(self, name: String) -> Self { + Self { name, ..self } + } + + pub fn version(self, version: String) -> Self { + Self { version, ..self } + } + + pub fn group(self, group: String) -> Self { + Self { group, ..self } + } + + pub fn protocol_names(self, protocol_names: Vec<String>) -> Self { + Self { + protocol_names, + ..self + } + } + + pub fn serializer(self, serializer: String) -> Self { + Self { serializer, ..self } + } + + pub fn add_protocol_configs(mut self, protocol_config: ProtocolConfig) -> Self { + self.protocol_configs + .insert(protocol_config.name.clone(), protocol_config); + Self { ..self } + } + + // pub fn get_url(&self) -> Vec<Url> { + // let mut urls = Vec::new(); + // for (_, conf) in self.protocol_configs.iter() { + // urls.push(Url { + // url: conf.to_owned().to_url(), + // service_key: "".to_string(), + // }); + // } + + // urls + // } +} diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index 1a693ba..ce32cb4 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -31,6 +31,7 @@ pin-project = "1.0" serde_json = "1.0.82" serde = {version="1.0.138", features = ["derive"]} tokio-stream = "0.1" +futures = "0.3" config = {path = "../config"} triple = {path = "../triple"} \ No newline at end of file diff --git a/dubbo/src/echo/echo_client.rs b/dubbo/src/echo/echo_client.rs index 0f8215b..ebe7908 100644 --- a/dubbo/src/echo/echo_client.rs +++ b/dubbo/src/echo/echo_client.rs @@ -1,7 +1,21 @@ -use std::str::FromStr; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ use super::echo_server::{HelloReply, HelloRequest}; -use bytes::Buf; use triple::client::TripleClient; use triple::codec::serde_codec::SerdeCodec; @@ -28,10 +42,8 @@ impl EchoClient { } pub fn with_uri(mut self, uri: String) -> Self { - self.uri = uri; - self.inner = self - .inner - .with_authority(http::uri::Authority::from_str(&self.uri).unwrap()); + self.uri = uri.clone(); + self.inner = self.inner.with_host(uri); self } @@ -53,36 +65,9 @@ impl EchoClient { &self, req: Request<HelloRequest>, ) -> Result<Response<HelloReply>, tonic::Status> { - let (_parts, body) = req.into_parts(); - let v = serde_json::to_vec(&body).unwrap(); - let req = hyper::Request::builder() - .uri("http://".to_owned() + &self.uri.clone() + "/hello") - .method("POST") - .body(hyper::Body::from(v)) - .unwrap(); - - println!("request: {:?}", req); - let response = hyper::Client::builder().build_http().request(req).await; - - match response { - Ok(v) => { - println!("{:?}", v); - let (_parts, body) = v.into_parts(); - let req_body = hyper::body::to_bytes(body).await.unwrap(); - let v = req_body.chunk(); - // let codec = SerdeCodec::<HelloReply, HelloRequest>::default(); - let data: HelloReply = match serde_json::from_slice(v) { - Ok(data) => data, - Err(err) => { - return Err(tonic::Status::new(tonic::Code::Internal, err.to_string())) - } - }; - Ok(Response::new(data)) - } - Err(err) => { - println!("{}", err); - Err(tonic::Status::new(tonic::Code::Internal, err.to_string())) - } - } + let codec = SerdeCodec::<HelloRequest, HelloReply>::default(); + self.inner + .unary(req, codec, http::uri::PathAndQuery::from_static("/hello")) + .await } } diff --git a/dubbo/src/echo/echo_server.rs b/dubbo/src/echo/echo_server.rs index d6e9476..96d89f3 100644 --- a/dubbo/src/echo/echo_server.rs +++ b/dubbo/src/echo/echo_server.rs @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + use async_trait::async_trait; use tonic::codegen::BoxFuture; @@ -50,7 +67,6 @@ pub trait Echo: Send + Sync + 'static { struct _Inner<T>(Arc<T>); -#[derive(Clone)] pub struct EchoServer<T, I = TripleInvoker> { inner: _Inner<T>, invoker: Option<I>, @@ -141,23 +157,18 @@ where Box::pin(fut) } - _ => { - Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - // .body(hyper::Body::from("implement...").map_err(|err| match err {}).boxed()) - // .body(hyper::Body::from("implement...").map_err(|err| std::convert::Infallible).into()) - // .body(req.into_body()) - .body( - http_body::Empty::new() - .map_err(|err| match err {}) - .boxed_unsync(), - ) - .unwrap()) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body( + http_body::Empty::new() + .map_err(|err| match err {}) + .boxed_unsync(), + ) + .unwrap()) + }), } } } @@ -186,3 +197,24 @@ impl<T: Debug> Debug for _Inner<T> { write!(f, "Inner {:?}", self.0) } } + +impl<T: Echo, I: Invoker + Send + Sync + 'static> Clone for EchoServer<T, I> { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + invoker: None, + } + } +} + +pub fn register_echo_server<T: Echo>(server: T) { + let s = EchoServer::<_, TripleInvoker>::new(server); + crate::protocol::triple::TRIPLE_SERVICES + .write() + .unwrap() + .insert( + "echo".to_string(), + crate::utils::boxed_clone::BoxCloneService::new(s), + ); +} diff --git a/dubbo/src/echo/helloworld.rs b/dubbo/src/echo/helloworld.rs new file mode 100644 index 0000000..9497f4b --- /dev/null +++ b/dubbo/src/echo/helloworld.rs @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + + + + +/// The request message containing the user's name. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HelloRequest { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, +} +/// The response message containing the greetings +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HelloReply { + #[prost(string, tag = "1")] + pub message: ::prost::alloc::string::String, +} +/// Generated client implementations. +pub mod greeter_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use triple::client::TripleClient; + use triple::codec::prost::ProstCodec; + use triple::invocation::*; + + /// The greeting service definition. + #[derive(Debug, Clone)] + pub struct GreeterClient { + inner: TripleClient, + uri: String, + } + // impl GreeterClient<tonic::transport::Channel> { + // /// Attempt to create a new client by connecting to a given endpoint. + // pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error> + // where + // D: std::convert::TryInto<tonic::transport::Endpoint>, + // D::Error: Into<StdError>, + // { + // let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + // Ok(Self::new(conn)) + // } + // } + impl GreeterClient { + pub fn new() -> Self { + Self { + inner: TripleClient::new(), + uri: "".to_string(), + } + } + // pub fn with_interceptor<F>( + // inner: T, + // interceptor: F, + // ) -> GreeterClient<InterceptedService<T, F>> + // where + // F: tonic::service::Interceptor, + // T::ResponseBody: Default, + // T: tonic::codegen::Service< + // http::Request<tonic::body::BoxBody>, + // Response = http::Response< + // <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody, + // >, + // >, + // <T as tonic::codegen::Service<http::Request<tonic::body::BoxBody>>>::Error: + // Into<StdError> + Send + Sync, + // { + // GreeterClient::new(InterceptedService::new(inner, interceptor)) + // } + /// Compress requests with `gzip`. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + // #[must_use] + // pub fn send_gzip(mut self) -> Self { + // self.inner = self.inner.send_gzip(); + // self + // } + // /// Enable decompressing responses with `gzip`. + // #[must_use] + // pub fn accept_gzip(mut self) -> Self { + // self.inner = self.inner.accept_gzip(); + // self + // } + /// Sends a greeting + pub async fn say_hello( + &mut self, + request: Request<super::HelloRequest>, + ) -> Result<Response<super::HelloReply>, tonic::Status> { + // self.inner.ready().await.map_err(|e| { + // tonic::Status::new( + // tonic::Code::Unknown, + // format!("Service was not ready: {}", e.into()), + // ) + // })?; + let codec = ProstCodec::<super::HelloRequest, super::HelloReply>::default(); + let path = http::uri::PathAndQuery::from_static("/helloworld.Greeter/SayHello"); + self.inner.unary(request, codec, path).await + } + } +} +/// Generated server implementations. +pub mod greeter_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + + use crate::protocol::server_desc::ServiceDesc; + use crate::protocol::triple::triple_invoker::TripleInvoker; + use crate::protocol::DubboGrpcService; + use crate::protocol::Invoker; + use crate::{BoxFuture, StdError}; + use async_trait::async_trait; + use http_body::Body; + use std::sync::Arc; + use std::task::Context; + use std::task::Poll; + use tower_service::Service; + use triple::codec::prost::ProstCodec; + use triple::empty_body; + use triple::invocation::{Request, Response}; + use triple::server::server::TripleServer; + use triple::server::service::UnaryService; + + ///Generated trait containing gRPC methods that should be implemented for use with GreeterServer. + #[async_trait] + pub trait Greeter: Send + Sync + 'static { + /// Sends a greeting + async fn say_hello( + &self, + request: Request<super::HelloRequest>, + ) -> Result<Response<super::HelloReply>, tonic::Status>; + } + /// The greeting service definition. + #[derive(Debug)] + pub struct GreeterServer<T: Greeter, I> { + inner: _Inner<T>, + invoker: Option<_Inner<I>>, + // accept_compression_encodings: EnabledCompressionEncodings, + // send_compression_encodings: EnabledCompressionEncodings, + } + struct _Inner<T>(Arc<T>); + impl<T: Greeter, I> GreeterServer<T, I> { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc<T>) -> Self { + let inner = _Inner(inner); + Self { + inner, + invoker: None, + // accept_compression_encodings: Default::default(), + // send_compression_encodings: Default::default(), + } + } + // pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F> + // where + // F: tonic::service::Interceptor, + // { + // InterceptedService::new(Self::new(inner), interceptor) + // } + // /// Enable decompressing requests with `gzip`. + // #[must_use] + // pub fn accept_gzip(mut self) -> Self { + // self.accept_compression_encodings.enable_gzip(); + // self + // } + // /// Compress responses with `gzip`, if the client supports it. + // #[must_use] + // pub fn send_gzip(mut self) -> Self { + // self.send_compression_encodings.enable_gzip(); + // self + // } + } + impl<T, B, I> Service<http::Request<B>> for GreeterServer<T, I> + where + T: Greeter, + B: Body + Send + 'static, + B::Error: Into<StdError> + Send + 'static, + { + type Response = http::Response<tonic::body::BoxBody>; + type Error = std::convert::Infallible; + type Future = BoxFuture<Self::Response, Self::Error>; + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request<B>) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/helloworld.Greeter/SayHello" => { + #[allow(non_camel_case_types)] + struct SayHelloSvc<T: Greeter>(pub Arc<T>); + impl<T: Greeter> UnaryService<super::HelloRequest> for SayHelloSvc<T> { + type Response = super::HelloReply; + type Future = BoxFuture<Response<Self::Response>, tonic::Status>; + fn call(&mut self, request: Request<super::HelloRequest>) -> Self::Future { + let inner = self.0.clone(); + let fut = async move { (*inner).say_hello(request).await }; + Box::pin(fut) + } + } + // let accept_compression_encodings = self.accept_compression_encodings; + // let send_compression_encodings = self.send_compression_encodings; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = SayHelloSvc(inner); + let codec = ProstCodec::<super::HelloReply, super::HelloRequest>::default(); + // let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( + // accept_compression_encodings, + // send_compression_encodings, + // ); + let mut grpc = TripleServer::new(codec); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), + } + } + } + impl<T: Greeter, I: Invoker + Send + Sync + 'static> Clone for GreeterServer<T, I> { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + invoker: None, + // invoker: if let Some(v) = self.invoker.borrow_mut() { + // Some(v.clone()) + // } else { + // None + // }, + // accept_compression_encodings: self.accept_compression_encodings, + // send_compression_encodings: self.send_compression_encodings, + } + } + } + + impl<T: Greeter> Clone for _Inner<T> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl<T: Greeter, I: Invoker> DubboGrpcService<I> for GreeterServer<T, I> { + fn set_proxy_impl(&mut self, invoker: I) { + self.invoker = Some(_Inner(Arc::new(invoker))); + } + + fn service_desc(&self) -> ServiceDesc { + ServiceDesc::new( + "helloworld.Greeter".to_string(), + std::collections::HashMap::new(), + ) + } + } + + impl<T: Greeter, I> tonic::transport::NamedService for GreeterServer<T, I> { + const NAME: &'static str = "helloworld.Greeter"; + } + + pub fn register_greeter_server<T: Greeter>(server: T) { + let s = GreeterServer::<_, TripleInvoker>::new(server); + crate::protocol::triple::TRIPLE_SERVICES + .write() + .unwrap() + .insert( + "helloworld.Greeter".to_string(), + crate::utils::boxed_clone::BoxCloneService::new(s), + ); + } +} diff --git a/dubbo/src/echo/mod.rs b/dubbo/src/echo/mod.rs index 356fa9a..4df0678 100644 --- a/dubbo/src/echo/mod.rs +++ b/dubbo/src/echo/mod.rs @@ -1,5 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + pub mod echo_client; pub mod echo_server; +pub mod helloworld; use futures_util::Stream; use futures_util::StreamExt; @@ -21,7 +39,7 @@ async fn test_client() { use futures_util::StreamExt; use triple::invocation::*; - let cli = EchoClient::new().with_uri("127.0.0.1:8888".to_string()); + let cli = EchoClient::new().with_uri("http://127.0.0.1:8888".to_string()); let resp = cli .say_hello(Request::new(HelloRequest { name: "message from client".to_string(), @@ -49,9 +67,8 @@ async fn test_client() { let bidi_resp = cli.bidirectional_streaming_echo(req).await.unwrap(); - let (_parts, mut body) = bidi_resp.into_parts(); - // let trailer = body.trailer().await.unwrap(); - // println!("trailer: {:?}", trailer); + let (parts, mut body) = bidi_resp.into_parts(); + println!("parts: {:?}", parts); while let Some(item) = body.next().await { match item { Ok(v) => { @@ -97,26 +114,39 @@ async fn test_triple_protocol() { use crate::common::url::Url; use crate::protocol::triple::triple_protocol::TripleProtocol; use crate::protocol::Protocol; - use crate::utils::boxed_clone::BoxCloneService; + use config::get_global_config; + use futures::join; - // crate::init::init(); + let conf = get_global_config(); + let server_name = "echo".to_string(); - let esi = EchoServer::<EchoServerImpl>::new(EchoServerImpl { - name: "echo".to_string(), + echo_server::register_echo_server(EchoServerImpl { + name: server_name.clone(), }); + helloworld::greeter_server::register_greeter_server(GreeterImpl {}); + println!("root config: {:?}", conf); + println!( + "register service num: {:?}", + crate::protocol::triple::TRIPLE_SERVICES + .read() + .unwrap() + .len() + ); + + let mut urls = Vec::<Url>::new(); + for (_, proto_conf) in conf.service.protocol_configs.iter() { + println!("{:?}", proto_conf); + let u = Url { + url: proto_conf.to_owned().to_url().clone(), + service_key: server_name.clone(), + }; + urls.push(u.clone()); - crate::protocol::triple::TRIPLE_SERVICES - .write() - .unwrap() - .insert("echo".to_string(), BoxCloneService::new(esi)); - - println!("triple server running, url: 0.0.0.0:8888"); - let pro = TripleProtocol::new(); - pro.export(Url { - url: "0.0.0.0:8888".to_string(), - service_key: "echo".to_string(), - }) - .await; + println!("triple server running, url: 0.0.0.0:8888, {:?}", u); + let pro = TripleProtocol::new(); + let tri_fut = pro.export(u.clone()); + let _res = join!(tri_fut); + } } #[allow(dead_code)] @@ -132,7 +162,7 @@ impl Echo for EchoServerImpl { &self, req: Request<HelloRequest>, ) -> Result<Response<HelloReply>, tonic::Status> { - println!("EchoServer::hello {:?}", req.message); + println!("EchoServer::hello {:?}", req.metadata); Ok(Response::new(HelloReply { reply: "hello, dubbo-rust".to_string(), @@ -145,7 +175,10 @@ impl Echo for EchoServerImpl { &self, request: Request<triple::server::Streaming<HelloRequest>>, ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, tonic::Status> { - println!("EchoServer::bidirectional_streaming_echo"); + println!( + "EchoServer::bidirectional_streaming_echo, grpc header: {:?}", + request.metadata + ); let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(128); @@ -212,3 +245,20 @@ fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> { }; } } + +use helloworld::greeter_server::Greeter; + +struct GreeterImpl {} + +#[async_trait] +impl Greeter for GreeterImpl { + async fn say_hello( + &self, + request: Request<helloworld::HelloRequest>, + ) -> Result<Response<helloworld::HelloReply>, tonic::Status> { + println!("greeter: req: {:?}", request.metadata); + Ok(Response::new(helloworld::HelloReply { + message: "hello, rust".to_string(), + })) + } +} diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs index 90ceb1c..1740648 100644 --- a/dubbo/src/lib.rs +++ b/dubbo/src/lib.rs @@ -18,9 +18,11 @@ pub mod common; pub mod echo; pub mod helloworld; -pub mod init; pub mod protocol; -pub mod registry; pub mod utils; -pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>; +use std::future::Future; +use std::pin::Pin; + +pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>; +pub type BoxFuture<T, E> = self::Pin<Box<dyn self::Future<Output = Result<T, E>> + Send + 'static>>; diff --git a/dubbo/src/main.rs b/dubbo/src/main.rs index db99336..baa5af4 100644 --- a/dubbo/src/main.rs +++ b/dubbo/src/main.rs @@ -18,12 +18,14 @@ pub mod common; pub mod echo; pub mod helloworld; -pub mod init; pub mod protocol; -pub mod registry; pub mod utils; -pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>; +use std::future::Future; +use std::pin::Pin; + +pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>; +pub type BoxFuture<T, E> = self::Pin<Box<dyn self::Future<Output = Result<T, E>> + Send + 'static>>; #[tokio::main] async fn main() { diff --git a/triple/readme.md b/triple/readme.md new file mode 100644 index 0000000..c969a78 --- /dev/null +++ b/triple/readme.md @@ -0,0 +1,15 @@ +# Triple Protocol + +Triple协议使用了hyper作为C/S之间的通信层,支持triple spec中自定义的header。 + +整体模块划分为: ++ codec ++ server: + + Router + + 网络事件处理层 + + 线程池:参考triple-go的线程模型 ++ client: + + 线程池 + + 连接处理层,类似于grpc channel层 + +Triple支持tls、grpc压缩 \ No newline at end of file diff --git a/triple/src/client/grpc.rs b/triple/src/client/grpc.rs index 1277868..362a0f2 100644 --- a/triple/src/client/grpc.rs +++ b/triple/src/client/grpc.rs @@ -15,17 +15,19 @@ * limitations under the License. */ -use futures_util::{StreamExt, TryStreamExt}; +use std::str::FromStr; + +use futures_util::{future, stream, StreamExt, TryStreamExt}; use http::HeaderValue; use crate::codec::Codec; -use crate::invocation::{IntoStreamingRequest, Response}; +use crate::invocation::{IntoStreamingRequest, Request, Response}; use crate::server::encode::encode; use crate::server::Streaming; #[derive(Debug, Clone, Default)] pub struct TripleClient { - host: Option<http::uri::Authority>, + host: Option<http::Uri>, inner: ConnectionPool, } @@ -55,52 +57,42 @@ impl TripleClient { } } - pub fn with_authority(self, host: http::uri::Authority) -> Self { + /// host: http://0.0.0.0:8888 + pub fn with_host(self, host: String) -> Self { + let uri = http::Uri::from_str(&host).unwrap(); TripleClient { - host: Some(host), + host: Some(uri), ..self } } } impl TripleClient { - pub async fn bidi_streaming<C, M1, M2>( - &mut self, - req: impl IntoStreamingRequest<Message = M1>, - mut codec: C, + fn map_request( + &self, path: http::uri::PathAndQuery, - ) -> Result<Response<Streaming<M2>>, tonic::Status> - where - C: Codec<Encode = M1, Decode = M2>, - M1: Send + Sync + 'static, - M2: Send + Sync + 'static, - { - let req = req.into_streaming_request(); - let en = encode(codec.encoder(), req.into_inner().map(Ok)).into_stream(); - let body = hyper::Body::wrap_stream(en); - - let mut parts = http::uri::Parts::default(); + body: hyper::Body, + ) -> http::Request<hyper::Body> { + let mut parts = self.host.clone().unwrap().into_parts(); parts.path_and_query = Some(path); - parts.authority = self.host.clone(); - parts.scheme = Some(http::uri::Scheme::HTTP); let uri = http::Uri::from_parts(parts).unwrap(); - let mut req = hyper::Request::builder() .version(http::Version::HTTP_2) .uri(uri.clone()) .method("POST") .body(body) .unwrap(); + *req.version_mut() = http::Version::HTTP_2; req.headers_mut() .insert("method", HeaderValue::from_static("POST")); req.headers_mut().insert( "scheme", - HeaderValue::from_str(uri.clone().scheme_str().unwrap()).unwrap(), + HeaderValue::from_str(uri.scheme_str().unwrap()).unwrap(), ); req.headers_mut() - .insert("path", HeaderValue::from_str(uri.clone().path()).unwrap()); + .insert("path", HeaderValue::from_str(uri.path()).unwrap()); req.headers_mut().insert( "authority", HeaderValue::from_str(uri.authority().unwrap().as_str()).unwrap(), @@ -136,20 +128,78 @@ impl TripleClient { // TripleTraceProtoBin = "tri-trace-proto-bin" // TripleUnitInfo = "tri-unit-info" // ) + req + } + + pub async fn unary<C, M1, M2>( + &self, + req: Request<M1>, + mut codec: C, + path: http::uri::PathAndQuery, + ) -> Result<Response<M2>, tonic::Status> + where + C: Codec<Encode = M1, Decode = M2>, + M1: Send + Sync + 'static, + M2: Send + Sync + 'static, + { + let req = req.map(|m| stream::once(future::ready(m))); + let body_stream = encode(codec.encoder(), req.into_inner().map(Ok)).into_stream(); + let body = hyper::Body::wrap_stream(body_stream); + let req = self.map_request(path, body); let cli = self.inner.clone().builder(); let response = cli.request(req).await; match response { Ok(v) => { let resp = v.map(|body| Streaming::new(body, codec.decoder())); + let (mut parts, body) = Response::from_http(resp).into_parts(); + + futures_util::pin_mut!(body); + + let message = body.try_next().await?.ok_or_else(|| { + tonic::Status::new(tonic::Code::Internal, "Missing response message.") + })?; - let (_parts, body) = resp.into_parts(); - Ok(Response::new(body)) + if let Some(trailers) = body.trailer().await? { + let mut h = parts.into_headers(); + h.extend(trailers.into_headers()); + parts = tonic::metadata::MetadataMap::from_headers(h); + } + + Ok(Response::from_parts(parts, message)) } - Err(err) => { - Err(tonic::Status::new(tonic::Code::Internal, err.to_string())) + Err(err) => Err(tonic::Status::new(tonic::Code::Internal, err.to_string())), + } + } + + pub async fn bidi_streaming<C, M1, M2>( + &mut self, + req: impl IntoStreamingRequest<Message = M1>, + mut codec: C, + path: http::uri::PathAndQuery, + ) -> Result<Response<Streaming<M2>>, tonic::Status> + where + C: Codec<Encode = M1, Decode = M2>, + M1: Send + Sync + 'static, + M2: Send + Sync + 'static, + { + let req = req.into_streaming_request(); + let en = encode(codec.encoder(), req.into_inner().map(Ok)).into_stream(); + let body = hyper::Body::wrap_stream(en); + + let req = self.map_request(path, body); + + let cli = self.inner.clone().builder(); + let response = cli.request(req).await; + + match response { + Ok(v) => { + let resp = v.map(|body| Streaming::new(body, codec.decoder())); + + Ok(Response::from_http(resp)) } + Err(err) => Err(tonic::Status::new(tonic::Code::Internal, err.to_string())), } } } diff --git a/triple/src/codec/mod.rs b/triple/src/codec/mod.rs index f6e61d9..c21b69a 100644 --- a/triple/src/codec/mod.rs +++ b/triple/src/codec/mod.rs @@ -16,6 +16,7 @@ */ pub mod buffer; +pub mod prost; pub mod serde_codec; use std::io; diff --git a/triple/src/codec/prost.rs b/triple/src/codec/prost.rs new file mode 100644 index 0000000..162ac51 --- /dev/null +++ b/triple/src/codec/prost.rs @@ -0,0 +1,236 @@ +use super::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}; +use prost::Message; +use std::marker::PhantomData; +use tonic::{Code, Status}; + +/// A [`Codec`] that implements `application/grpc+proto` via the prost library.. +#[derive(Debug, Clone)] +pub struct ProstCodec<T, U> { + _pd: PhantomData<(T, U)>, +} + +impl<T, U> Default for ProstCodec<T, U> { + fn default() -> Self { + Self { _pd: PhantomData } + } +} + +impl<T, U> Codec for ProstCodec<T, U> +where + T: Message + Send + 'static, + U: Message + Default + Send + 'static, +{ + type Encode = T; + type Decode = U; + + type Encoder = ProstEncoder<T>; + type Decoder = ProstDecoder<U>; + + fn encoder(&mut self) -> Self::Encoder { + ProstEncoder(PhantomData) + } + + fn decoder(&mut self) -> Self::Decoder { + ProstDecoder(PhantomData) + } +} + +/// A [`Encoder`] that knows how to encode `T`. +#[derive(Debug, Clone, Default)] +pub struct ProstEncoder<T>(PhantomData<T>); + +impl<T: Message> Encoder for ProstEncoder<T> { + type Item = T; + type Error = Status; + + fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { + item.encode(buf) + .expect("Message only errors if not enough space"); + + Ok(()) + } +} + +/// A [`Decoder`] that knows how to decode `U`. +#[derive(Debug, Clone, Default)] +pub struct ProstDecoder<U>(PhantomData<U>); + +impl<U: Message + Default> Decoder for ProstDecoder<U> { + type Item = U; + type Error = Status; + + fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> { + let item = Message::decode(buf) + .map(Option::Some) + .map_err(from_decode_error)?; + + Ok(item) + } +} + +fn from_decode_error(error: prost::DecodeError) -> Status { + // Map Protobuf parse errors to an INTERNAL status code, as per + // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md + Status::new(Code::Internal, error.to_string()) +} + +// #[cfg(test)] +// mod tests { +// use crate::codec::compression::SingleMessageCompressionOverride; +// use crate::codec::{ +// encode_server, DecodeBuf, Decoder, EncodeBuf, Encoder, Streaming, HEADER_SIZE, +// }; +// use crate::Status; +// use bytes::{Buf, BufMut, BytesMut}; +// use http_body::Body; + +// const LEN: usize = 10000; + +// #[tokio::test] +// async fn decode() { +// let decoder = MockDecoder::default(); + +// let msg = vec![0u8; LEN]; + +// let mut buf = BytesMut::new(); + +// buf.reserve(msg.len() + HEADER_SIZE); +// buf.put_u8(0); +// buf.put_u32(msg.len() as u32); + +// buf.put(&msg[..]); + +// let body = body::MockBody::new(&buf[..], 10005, 0); + +// let mut stream = Streaming::new_request(decoder, body, None); + +// let mut i = 0usize; +// while let Some(output_msg) = stream.message().await.unwrap() { +// assert_eq!(output_msg.len(), msg.len()); +// i += 1; +// } +// assert_eq!(i, 1); +// } + +// #[tokio::test] +// async fn encode() { +// let encoder = MockEncoder::default(); + +// let msg = Vec::from(&[0u8; 1024][..]); + +// let messages = std::iter::repeat_with(move || Ok::<_, Status>(msg.clone())).take(10000); +// let source = futures_util::stream::iter(messages); + +// let body = encode_server( +// encoder, +// source, +// None, +// SingleMessageCompressionOverride::default(), +// ); + +// futures_util::pin_mut!(body); + +// while let Some(r) = body.data().await { +// r.unwrap(); +// } +// } + +// #[derive(Debug, Clone, Default)] +// struct MockEncoder; + +// impl Encoder for MockEncoder { +// type Item = Vec<u8>; +// type Error = Status; + +// fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> { +// buf.put(&item[..]); +// Ok(()) +// } +// } + +// #[derive(Debug, Clone, Default)] +// struct MockDecoder; + +// impl Decoder for MockDecoder { +// type Item = Vec<u8>; +// type Error = Status; + +// fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> { +// let out = Vec::from(buf.chunk()); +// buf.advance(LEN); +// Ok(Some(out)) +// } +// } + +// mod body { +// use crate::Status; +// use bytes::Bytes; +// use http_body::Body; +// use std::{ +// pin::Pin, +// task::{Context, Poll}, +// }; + +// #[derive(Debug)] +// pub(super) struct MockBody { +// data: Bytes, + +// // the size of the partial message to send +// partial_len: usize, + +// // the number of times we've sent +// count: usize, +// } + +// impl MockBody { +// pub(super) fn new(b: &[u8], partial_len: usize, count: usize) -> Self { +// MockBody { +// data: Bytes::copy_from_slice(b), +// partial_len, +// count, +// } +// } +// } + +// impl Body for MockBody { +// type Data = Bytes; +// type Error = Status; + +// fn poll_data( +// mut self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// ) -> Poll<Option<Result<Self::Data, Self::Error>>> { +// // every other call to poll_data returns data +// let should_send = self.count % 2 == 0; +// let data_len = self.data.len(); +// let partial_len = self.partial_len; +// let count = self.count; +// if data_len > 0 { +// let result = if should_send { +// let response = +// self.data +// .split_to(if count == 0 { partial_len } else { data_len }); +// Poll::Ready(Some(Ok(response))) +// } else { +// cx.waker().wake_by_ref(); +// Poll::Pending +// }; +// // make some fake progress +// self.count += 1; +// result +// } else { +// Poll::Ready(None) +// } +// } + +// #[allow(clippy::drop_ref)] +// fn poll_trailers( +// self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { +// drop(cx); +// Poll::Ready(Ok(None)) +// } +// } +// } +// } diff --git a/triple/src/invocation.rs b/triple/src/invocation.rs index 1655312..de2e36e 100644 --- a/triple/src/invocation.rs +++ b/triple/src/invocation.rs @@ -39,6 +39,10 @@ impl<T> Request<T> { (self.metadata, self.message) } + pub fn from_parts(metadata: MetadataMap, message: T) -> Self { + Request { message, metadata } + } + pub fn from_http(req: http::Request<T>) -> Self { let (parts, body) = req.into_parts(); Request { @@ -54,6 +58,17 @@ impl<T> Request<T> { http_req } + + pub fn map<F, U>(self, f: F) -> Request<U> + where + F: FnOnce(T) -> U, + { + let m = f(self.message); + Request { + message: m, + metadata: self.metadata, + } + } } pub struct Response<T> { @@ -85,6 +100,14 @@ impl<T> Response<T> { http_resp } + pub fn from_http(resp: http::Response<T>) -> Self { + let (part, body) = resp.into_parts(); + Response { + message: body, + metadata: MetadataMap::from_headers(part.headers), + } + } + pub fn map<F, U>(self, f: F) -> Response<U> where F: FnOnce(T) -> U, diff --git a/triple/src/server/encode.rs b/triple/src/server/encode.rs index c76f448..9e4ebc1 100644 --- a/triple/src/server/encode.rs +++ b/triple/src/server/encode.rs @@ -162,7 +162,6 @@ where tonic::Status::ok("") }; let http = status.to_http(); - println!("status: {:?}", http.headers().clone()); Poll::Ready(Ok(Some(http.headers().to_owned()))) } diff --git a/triple/src/server/server.rs b/triple/src/server/server.rs index 6474d6c..859f545 100644 --- a/triple/src/server/server.rs +++ b/triple/src/server/server.rs @@ -15,11 +15,10 @@ * limitations under the License. */ -use bytes::{Buf, BytesMut}; +use futures_util::{future, stream, StreamExt, TryStreamExt}; use http_body::Body; -use std::fmt::Debug; -use crate::codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder}; +use crate::codec::Codec; use crate::invocation::Request; use crate::server::encode::encode_server; use crate::server::service::{StreamingSvc, UnaryService}; @@ -75,55 +74,30 @@ where where S: UnaryService<T::Decode, Response = T::Encode>, B: Body + Send + 'static, - B::Error: Debug, + B::Error: Into<crate::Error> + Send, { - let (_parts, body) = req.into_parts(); - let req_body = hyper::body::to_bytes(body).await.unwrap(); - let v = req_body.chunk(); - let mut req_byte = BytesMut::from(v); - let mut de = DecodeBuf::new(&mut req_byte, v.len()); - let decoder = self - .codec - .decoder() - .decode(&mut de) - .map(|v| v.unwrap()) - .unwrap(); - let req = Request::new(decoder); - - let resp = service.call(req).await; - - let resp = match resp { - Ok(r) => r, - Err(status) => { - let (mut parts, _body) = http::Response::new(()).into_parts(); - parts.headers.insert( - http::header::CONTENT_TYPE, - http::HeaderValue::from_static("application/grpc"), - ); - parts.status = status.to_http().status(); - - return http::Response::from_parts(parts, crate::empty_body()); - } - }; - let (mut parts, body) = resp.into_http().into_parts(); + let req_stream = req.map(|body| Streaming::new(body, self.codec.decoder())); + let (parts, mut body) = Request::from_http(req_stream).into_parts(); + let msg = body + .try_next() + .await + .unwrap() + .ok_or_else(|| tonic::Status::new(tonic::Code::Unknown, "request wrong")); - // let data = hyper::body::aggregate(body) - // let b = body.size_hint(); - let mut bytes = BytesMut::with_capacity(100); - let mut dst = EncodeBuf::new(&mut bytes); - let _res = self.codec.encoder().encode(body, &mut dst); - let data = bytes.to_vec(); + let resp = service.call(Request::from_parts(parts, msg.unwrap())).await; - let resp_body = hyper::Body::from(data); + let (mut parts, resp_body) = resp.unwrap().into_http().into_parts(); + let resp_body = encode_server( + self.codec.encoder(), + stream::once(future::ready(resp_body)).map(Ok).into_stream(), + ); + parts.headers.insert( + http::header::CONTENT_TYPE, + http::HeaderValue::from_static("application/grpc"), + ); parts.status = http::StatusCode::OK; - // http::Response::from_parts(parts, resp_body.map_err(|err| err.into()).boxed_unsync()) - http::Response::from_parts( - parts, - resp_body - .map_err(|err| tonic::Status::new(tonic::Code::Internal, err.to_string())) - .boxed_unsync(), - ) + http::Response::from_parts(parts, BoxBody::new(resp_body)) } } diff --git a/triple/src/transport/service.rs b/triple/src/transport/service.rs index e470601..fdd6ecb 100644 --- a/triple/src/transport/service.rs +++ b/triple/src/transport/service.rs @@ -125,17 +125,21 @@ impl DubboServer { inner: self.services.get(&name).unwrap().clone(), }; + let http2_keepalive_timeout = self + .http2_keepalive_timeout + .unwrap_or_else(|| Duration::new(60, 0)); + hyper::Server::bind(&addr) .http2_only(self.accept_http2) .http2_max_concurrent_streams(self.max_concurrent_streams) .http2_initial_connection_window_size(self.init_connection_window_size) .http2_initial_stream_window_size(self.init_stream_window_size) .http2_keep_alive_interval(self.http2_keepalive_interval) - .http2_keep_alive_timeout(self.http2_keepalive_timeout.unwrap()) + .http2_keep_alive_timeout(http2_keepalive_timeout) .http2_max_frame_size(self.max_frame_size) .serve(svc) .await - .map_err(|err| println!("Error: {:?}", err)) + .map_err(|err| println!("hyper serve, Error: {:?}", err)) .unwrap(); Ok(()) @@ -170,8 +174,7 @@ where impl BusinessConfig for DubboServer { fn init() -> Self { let conf = config::get_global_config(); - DubboServer::new() - .with_accpet_http1(conf.bool("dubbo.server.accept_http2".to_string())) + DubboServer::new().with_accpet_http1(conf.bool("dubbo.server.accept_http2".to_string())) } fn load() -> Result<(), std::convert::Infallible> {
