This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 6278851c14c61f8bab930b20338b6e859bd9b1e2 Author: yangyang <[email protected]> AuthorDate: Fri Jul 29 00:19:09 2022 +0800 refactor(triple): add router, support grpc compression, start multi server at the same time --- config/src/config.rs | 25 +++++++- config/src/service.rs | 11 ++-- dubbo/src/echo/echo_client.rs | 8 ++- dubbo/src/echo/echo_server.rs | 4 +- dubbo/src/echo/mod.rs | 53 +++++++++++----- dubbo/src/helloworld/client.rs | 4 +- dubbo/src/protocol/triple/mod.rs | 2 - dubbo/src/protocol/triple/triple_protocol.rs | 5 +- dubbo/src/protocol/triple/triple_server.rs | 3 +- triple/Cargo.toml | 1 + triple/src/client/grpc.rs | 29 +++++++-- triple/src/lib.rs | 9 +++ triple/src/main.rs | 36 ----------- triple/src/server/compression.rs | 82 ++++++++++++++++++++++++ triple/src/server/consts.rs | 16 +++++ triple/src/server/decode.rs | 53 ++++++++++++++-- triple/src/server/encode.rs | 32 ++++++++-- triple/src/server/mod.rs | 1 + triple/src/server/server.rs | 49 ++++++++++++++- triple/src/transport/mod.rs | 1 + triple/src/transport/router.rs | 94 ++++++++++++++++++++++++++++ triple/src/transport/service.rs | 23 ++++--- 22 files changed, 439 insertions(+), 102 deletions(-) diff --git a/config/src/config.rs b/config/src/config.rs index 1adf7d3..e1ff02f 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -26,7 +26,8 @@ use super::service::ServiceConfig; #[derive(Debug, Default)] pub struct RootConfig { pub name: String, - pub service: ServiceConfig, + pub service: HashMap<String, ServiceConfig>, + pub protocols: HashMap<String, ProtocolConfig>, pub data: HashMap<String, Box<dyn any::Any>>, } @@ -40,7 +41,8 @@ impl RootConfig { pub fn new() -> Self { Self { name: "dubbo".to_string(), - service: ServiceConfig::default(), + service: HashMap::new(), + protocols: HashMap::new(), data: HashMap::new(), } } @@ -50,6 +52,7 @@ impl RootConfig { .group("test".to_string()) .serializer("json".to_string()) .version("1.0.0".to_string()) + .protocol_names("triple".to_string()) .name("echo".to_string()); let triple_config = ProtocolConfig::default() @@ -58,7 +61,23 @@ impl RootConfig { .port("8888".to_string()); let service_config = service_config.add_protocol_configs(triple_config); - self.service = service_config; + self.service.insert("echo".to_string(), service_config); + self.service.insert( + "helloworld.Greeter".to_string(), + ServiceConfig::default() + .group("test".to_string()) + .serializer("json".to_string()) + .version("1.0.0".to_string()) + .name("helloworld.Greeter".to_string()) + .protocol_names("triple".to_string()), + ); + self.protocols.insert( + "triple".to_string(), + ProtocolConfig::default() + .name("triple".to_string()) + .ip("0.0.0.0".to_string()) + .port("8889".to_string()), + ); // 通过环境变量读取某个文件。加在到内存中 self.data.insert( "dubbo.provider.url".to_string(), diff --git a/config/src/service.rs b/config/src/service.rs index aeda24b..146c041 100644 --- a/config/src/service.rs +++ b/config/src/service.rs @@ -24,8 +24,8 @@ pub struct ServiceConfig { pub version: String, pub group: String, pub name: String, - pub protocol_names: Vec<String>, - pub registry_names: Vec<String>, + pub protocol: String, + pub registry: String, pub serializer: String, pub protocol_configs: HashMap<String, ProtocolConfig>, } @@ -43,11 +43,8 @@ impl ServiceConfig { Self { group, ..self } } - pub fn protocol_names(self, protocol_names: Vec<String>) -> Self { - Self { - protocol_names, - ..self - } + pub fn protocol_names(self, protocol: String) -> Self { + Self { protocol, ..self } } pub fn serializer(self, serializer: String) -> Self { diff --git a/dubbo/src/echo/echo_client.rs b/dubbo/src/echo/echo_client.rs index ebe7908..c89dad9 100644 --- a/dubbo/src/echo/echo_client.rs +++ b/dubbo/src/echo/echo_client.rs @@ -56,7 +56,7 @@ impl EchoClient { .bidi_streaming( req, codec, - http::uri::PathAndQuery::from_static("/bidi_stream"), + http::uri::PathAndQuery::from_static("/echo/bidi_stream"), ) .await } @@ -67,7 +67,11 @@ impl EchoClient { ) -> Result<Response<HelloReply>, tonic::Status> { let codec = SerdeCodec::<HelloRequest, HelloReply>::default(); self.inner - .unary(req, codec, http::uri::PathAndQuery::from_static("/hello")) + .unary( + req, + codec, + http::uri::PathAndQuery::from_static("/echo/hello"), + ) .await } } diff --git a/dubbo/src/echo/echo_server.rs b/dubbo/src/echo/echo_server.rs index 96d89f3..a2ff575 100644 --- a/dubbo/src/echo/echo_server.rs +++ b/dubbo/src/echo/echo_server.rs @@ -101,7 +101,7 @@ where fn call(&mut self, req: http::Request<B>) -> Self::Future { let inner = self.inner.clone(); match req.uri().path() { - "/hello" => { + "/echo/hello" => { struct UnaryServer<T> { inner: _Inner<T>, } @@ -127,7 +127,7 @@ where Box::pin(fut) } - "/bidi_stream" => { + "/echo/bidi_stream" => { struct StreamingServer<T> { inner: _Inner<T>, } diff --git a/dubbo/src/echo/mod.rs b/dubbo/src/echo/mod.rs index 4df0678..7d4ff83 100644 --- a/dubbo/src/echo/mod.rs +++ b/dubbo/src/echo/mod.rs @@ -100,10 +100,7 @@ async fn test_server() { triple::transport::DubboServer::new() .add_service(name.clone(), esi) .with_http2_keepalive_timeout(Duration::from_secs(60)) - .serve( - name.clone(), - "0.0.0.0:8888".to_socket_addrs().unwrap().next().unwrap(), - ) + .serve("0.0.0.0:8888".to_socket_addrs().unwrap().next().unwrap()) .await .unwrap(); // server.add_service(esi.into()); @@ -112,10 +109,12 @@ async fn test_server() { #[tokio::test] async fn test_triple_protocol() { use crate::common::url::Url; + use crate::protocol::triple::triple_exporter::TripleExporter; use crate::protocol::triple::triple_protocol::TripleProtocol; use crate::protocol::Protocol; use config::get_global_config; - use futures::join; + use futures::future; + use futures::Future; let conf = get_global_config(); let server_name = "echo".to_string(); @@ -130,23 +129,45 @@ async fn test_triple_protocol() { crate::protocol::triple::TRIPLE_SERVICES .read() .unwrap() - .len() + .len(), ); let mut urls = Vec::<Url>::new(); - for (_, proto_conf) in conf.service.protocol_configs.iter() { - println!("{:?}", proto_conf); - let u = Url { - url: proto_conf.to_owned().to_url().clone(), - service_key: server_name.clone(), - }; + let pro = Box::new(TripleProtocol::new()); + let mut async_vec: Vec<Pin<Box<dyn Future<Output = TripleExporter> + Send>>> = Vec::new(); + for (_, c) in conf.service.iter() { + let mut u = Url::default(); + if c.protocol_configs.is_empty() { + u = Url { + url: conf + .protocols + .get(&c.protocol) + .unwrap() + .clone() + .to_url() + .clone(), + service_key: c.name.clone(), + }; + } else { + u = Url { + url: c + .protocol_configs + .get(&c.protocol) + .unwrap() + .clone() + .to_url() + .clone(), + service_key: c.name.clone(), + } + } + println!("url: {:?}", u); urls.push(u.clone()); - println!("triple server running, url: 0.0.0.0:8888, {:?}", u); - let pro = TripleProtocol::new(); - let tri_fut = pro.export(u.clone()); - let _res = join!(tri_fut); + let tri_fut = pro.clone().export(u.clone()); + async_vec.push(tri_fut); } + + let _res = future::join_all(async_vec).await; } #[allow(dead_code)] diff --git a/dubbo/src/helloworld/client.rs b/dubbo/src/helloworld/client.rs index 5fd571d..078c665 100644 --- a/dubbo/src/helloworld/client.rs +++ b/dubbo/src/helloworld/client.rs @@ -26,13 +26,13 @@ use dubbo::helloworld::helloworld::HelloRequest; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { - let mut client = GreeterClient::connect("http://[::1]:50051").await?; + let client = GreeterClient::connect("http://127.0.0.1:8889").await?; let request = tonic::Request::new(HelloRequest { name: "Tonic".into(), }); - let response = client.say_hello(request).await?; + let response = client.send_gzip().accept_gzip().say_hello(request).await?; println!("RESPONSE={:?}", response); diff --git a/dubbo/src/protocol/triple/mod.rs b/dubbo/src/protocol/triple/mod.rs index 0e84907..fed8bba 100644 --- a/dubbo/src/protocol/triple/mod.rs +++ b/dubbo/src/protocol/triple/mod.rs @@ -31,8 +31,6 @@ pub type GrpcBoxCloneService = BoxCloneService<http::Request<hyper::Body>, http::Response<BoxBody>, std::convert::Infallible>; lazy_static! { - // pub static ref DUBBO_GRPC_SERVICES: RwLock<HashMap<String, Box<dyn DubboGrpcService<GrpcInvoker> + Send + Sync + 'static>>> = - // RwLock::new(HashMap::new()); pub static ref TRIPLE_SERVICES: RwLock<HashMap<String, GrpcBoxCloneService>> = RwLock::new(HashMap::new()); } diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs index ba517bb..0aa3537 100644 --- a/dubbo/src/protocol/triple/triple_protocol.rs +++ b/dubbo/src/protocol/triple/triple_protocol.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +use std::boxed::Box; use std::collections::HashMap; use async_trait::async_trait; @@ -25,6 +26,7 @@ use super::triple_server::TripleServer; use crate::common::url::Url; use crate::protocol::Protocol; +#[derive(Clone)] pub struct TripleProtocol { servers: HashMap<String, TripleServer>, } @@ -57,8 +59,9 @@ impl Protocol for TripleProtocol { todo!() } - async fn export(self, url: Url) -> Self::Exporter { + async fn export(mut self, url: Url) -> Self::Exporter { let server = TripleServer::new(url.service_key.clone()); + self.servers.insert(url.service_key.clone(), server.clone()); server.serve(url.url.clone()).await; TripleExporter::new() diff --git a/dubbo/src/protocol/triple/triple_server.rs b/dubbo/src/protocol/triple/triple_server.rs index e49694b..207f46b 100644 --- a/dubbo/src/protocol/triple/triple_server.rs +++ b/dubbo/src/protocol/triple/triple_server.rs @@ -29,7 +29,7 @@ impl TripleServer { pub fn new(name: String) -> TripleServer { Self { name, - ..Default::default() + s: DubboServer::new(), } } @@ -46,7 +46,6 @@ impl TripleServer { server .serve( - self.name, uri.authority() .unwrap() .to_string() diff --git a/triple/Cargo.toml b/triple/Cargo.toml index df17abd..34962d7 100644 --- a/triple/Cargo.toml +++ b/triple/Cargo.toml @@ -29,6 +29,7 @@ serde_json = "1.0.82" serde = {version="1.0.138", features = ["derive"]} async-stream = "0.3" tokio-stream = "0.1" +flate2 = "1.0" # for test in codegen config = {path = "../config"} \ No newline at end of file diff --git a/triple/src/client/grpc.rs b/triple/src/client/grpc.rs index 362a0f2..32ebb8d 100644 --- a/triple/src/client/grpc.rs +++ b/triple/src/client/grpc.rs @@ -22,6 +22,7 @@ use http::HeaderValue; use crate::codec::Codec; use crate::invocation::{IntoStreamingRequest, Request, Response}; +use crate::server::consts::CompressionEncoding; use crate::server::encode::encode; use crate::server::Streaming; @@ -29,6 +30,7 @@ use crate::server::Streaming; pub struct TripleClient { host: Option<http::Uri>, inner: ConnectionPool, + send_compression_encoding: Option<CompressionEncoding>, } #[derive(Debug, Default, Clone)] @@ -54,6 +56,7 @@ impl TripleClient { TripleClient { host: None, inner: ConnectionPool::new(), + send_compression_encoding: Some(CompressionEncoding::Gzip), } } @@ -115,6 +118,10 @@ impl TripleClient { "tri-unit-info", HeaderValue::from_static("dubbo-rust/1.0.0"), ); + if let Some(_encoding) = self.send_compression_encoding { + req.headers_mut() + .insert("grpc-encoding", http::HeaderValue::from_static("gzip")); + } // const ( // TripleContentType = "application/grpc+proto" @@ -143,7 +150,12 @@ impl TripleClient { M2: Send + Sync + 'static, { let req = req.map(|m| stream::once(future::ready(m))); - let body_stream = encode(codec.encoder(), req.into_inner().map(Ok)).into_stream(); + let body_stream = encode( + codec.encoder(), + req.into_inner().map(Ok), + self.send_compression_encoding, + ) + .into_stream(); let body = hyper::Body::wrap_stream(body_stream); let req = self.map_request(path, body); @@ -152,7 +164,9 @@ impl TripleClient { match response { Ok(v) => { - let resp = v.map(|body| Streaming::new(body, codec.decoder())); + let resp = v.map(|body| { + Streaming::new(body, codec.decoder(), self.send_compression_encoding) + }); let (mut parts, body) = Response::from_http(resp).into_parts(); futures_util::pin_mut!(body); @@ -185,7 +199,12 @@ impl TripleClient { M2: Send + Sync + 'static, { let req = req.into_streaming_request(); - let en = encode(codec.encoder(), req.into_inner().map(Ok)).into_stream(); + let en = encode( + codec.encoder(), + req.into_inner().map(Ok), + self.send_compression_encoding, + ) + .into_stream(); let body = hyper::Body::wrap_stream(en); let req = self.map_request(path, body); @@ -195,7 +214,9 @@ impl TripleClient { match response { Ok(v) => { - let resp = v.map(|body| Streaming::new(body, codec.decoder())); + let resp = v.map(|body| { + Streaming::new(body, codec.decoder(), self.send_compression_encoding) + }); Ok(Response::from_http(resp)) } diff --git a/triple/src/lib.rs b/triple/src/lib.rs index 42a3c6d..ddb61a5 100644 --- a/triple/src/lib.rs +++ b/triple/src/lib.rs @@ -32,3 +32,12 @@ pub fn empty_body() -> BoxBody { .map_err(|err| match err {}) .boxed_unsync() } + +pub(crate) fn boxed<B>(body: B) -> BoxBody +where + B: http_body::Body<Data = bytes::Bytes> + Send + 'static, + B::Error: Into<crate::Error>, +{ + body.map_err(|err| tonic::Status::new(tonic::Code::Internal, format!("{:?}", err.into()))) + .boxed_unsync() +} diff --git a/triple/src/main.rs b/triple/src/main.rs deleted file mode 100644 index 4350696..0000000 --- a/triple/src/main.rs +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -pub mod client; -pub mod codec; -pub mod invocation; -pub mod server; -pub mod transport; - -use http_body::Body; - -pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>; -pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, tonic::Status>; -pub fn empty_body() -> BoxBody { - http_body::Empty::new() - .map_err(|err| match err {}) - .boxed_unsync() -} - -fn main() { - println!("hello, triple"); -} diff --git a/triple/src/server/compression.rs b/triple/src/server/compression.rs new file mode 100644 index 0000000..77f66a9 --- /dev/null +++ b/triple/src/server/compression.rs @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use bytes::{Buf, BufMut, BytesMut}; +use flate2::read::{GzDecoder, GzEncoder}; +use flate2::Compression; + +use super::consts::CompressionEncoding; + +pub fn compress( + encoding: CompressionEncoding, + src: &mut BytesMut, + dst: &mut BytesMut, + len: usize, +) -> Result<(), std::io::Error> { + dst.reserve(len); + + match encoding { + CompressionEncoding::Gzip => { + let mut en = GzEncoder::new(src.reader(), Compression::default()); + + let mut dst_writer = dst.writer(); + + std::io::copy(&mut en, &mut dst_writer)?; + } + } + + Ok(()) +} + +pub fn decompress( + encoding: CompressionEncoding, + src: &mut BytesMut, + dst: &mut BytesMut, + len: usize, +) -> Result<(), std::io::Error> { + let capacity = len * 2; + dst.reserve(capacity); + + match encoding { + CompressionEncoding::Gzip => { + let mut de = GzDecoder::new(src.reader()); + + let mut dst_writer = dst.writer(); + + std::io::copy(&mut de, &mut dst_writer)?; + } + } + Ok(()) +} + +#[test] +fn test_compress() { + let mut src = BytesMut::with_capacity(super::consts::BUFFER_SIZE); + src.put(&b"test compress"[..]); + let mut dst = BytesMut::new(); + let len = src.len(); + src.reserve(len); + + compress(CompressionEncoding::Gzip, &mut src, &mut dst, len); + println!("src: {:?}, dst: {:?}", src, dst); + + let mut de_dst = BytesMut::with_capacity(super::consts::BUFFER_SIZE); + let de_len = dst.len(); + decompress(CompressionEncoding::Gzip, &mut dst, &mut de_dst, de_len); + + println!("src: {:?}, dst: {:?}", dst, de_dst); +} diff --git a/triple/src/server/consts.rs b/triple/src/server/consts.rs index 4898211..0e0f0fa 100644 --- a/triple/src/server/consts.rs +++ b/triple/src/server/consts.rs @@ -22,3 +22,19 @@ pub const HEADER_SIZE: usize = std::mem::size_of::<u8>() + // data length std::mem::size_of::<u32>(); + +#[derive(Debug, Clone, Copy)] +pub enum CompressionEncoding { + Gzip, +} + +use lazy_static::lazy_static; +use std::collections::HashMap; + +lazy_static! { + pub static ref COMPRESSIONS: HashMap<String, Option<CompressionEncoding>> = { + let mut v = HashMap::new(); + v.insert("gzip".to_string(), Some(CompressionEncoding::Gzip)); + v + }; +} diff --git a/triple/src/server/decode.rs b/triple/src/server/decode.rs index 3c62aeb..321a4e6 100644 --- a/triple/src/server/decode.rs +++ b/triple/src/server/decode.rs @@ -23,6 +23,8 @@ use futures_util::{future, ready}; use http_body::Body; use tonic::metadata::MetadataMap; +use super::compression::decompress; +use super::consts::CompressionEncoding; use crate::codec::{DecodeBuf, Decoder}; type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, tonic::Status>; @@ -33,17 +35,19 @@ pub struct Streaming<T> { decoder: Box<dyn Decoder<Item = T, Error = tonic::Status> + Send + 'static>, buf: BytesMut, trailers: Option<MetadataMap>, + compress: Option<CompressionEncoding>, + decompress_buf: BytesMut, } #[derive(PartialEq)] enum State { ReadHeader, - ReadBody { len: usize }, + ReadBody { len: usize, is_compressed: bool }, Error, } impl<T> Streaming<T> { - pub fn new<B, D>(body: B, decoder: D) -> Self + pub fn new<B, D>(body: B, decoder: D, compress: Option<CompressionEncoding>) -> Self where B: Body + Send + 'static, B::Error: Into<crate::Error>, @@ -58,6 +62,8 @@ impl<T> Streaming<T> { decoder: Box::new(decoder), buf: BytesMut::with_capacity(super::consts::BUFFER_SIZE), trailers: None, + compress, + decompress_buf: BytesMut::new(), } } @@ -86,19 +92,54 @@ impl<T> Streaming<T> { return Ok(None); } - let _is_compressed = self.buf.get_u8(); + let is_compressed = match self.buf.get_u8() { + 0 => false, + 1 => { + if self.compress.is_some() { + true + } else { + return Err(tonic::Status::internal( + "set compression flag, but no grpc-encoding specified", + )); + } + } + v => { + return Err(tonic::Status::internal(format!( + "receive message with compression flag{}, flag should be 0 or 1", + v + ))) + } + }; let len = self.buf.get_u32() as usize; self.buf.reserve(len as usize); - self.state = State::ReadBody { len } + self.state = State::ReadBody { len, is_compressed } } - if let State::ReadBody { len } = self.state { + if let State::ReadBody { len, is_compressed } = self.state { if self.buf.remaining() < len || self.buf.len() < len { return Ok(None); } - let decoding_result = self.decoder.decode(&mut DecodeBuf::new(&mut self.buf, len)); + let decoding_result = if is_compressed { + self.decompress_buf.clear(); + if let Err(err) = decompress( + self.compress.unwrap(), + &mut self.buf, + &mut self.decompress_buf, + len, + ) { + return Err(tonic::Status::internal(err.to_string())); + } + + let decompress_len = self.decompress_buf.len(); + self.decoder.decode(&mut DecodeBuf::new( + &mut self.decompress_buf, + decompress_len, + )) + } else { + self.decoder.decode(&mut DecodeBuf::new(&mut self.buf, len)) + }; return match decoding_result { Ok(Some(r)) => { diff --git a/triple/src/server/encode.rs b/triple/src/server/encode.rs index 9e4ebc1..dfa4ae7 100644 --- a/triple/src/server/encode.rs +++ b/triple/src/server/encode.rs @@ -23,10 +23,16 @@ use http_body::Body; use pin_project::pin_project; use tonic::Status; +use super::compression::compress; +use super::consts::CompressionEncoding; use crate::codec::{EncodeBuf, Encoder}; #[allow(unused_must_use)] -pub fn encode<E, B>(mut encoder: E, resp_body: B) -> impl TryStream<Ok = Bytes, Error = Status> +pub fn encode<E, B>( + mut encoder: E, + resp_body: B, + compression_encoding: Option<CompressionEncoding>, +) -> impl TryStream<Ok = Bytes, Error = Status> where E: Encoder<Error = Status>, B: Stream<Item = Result<E::Item, Status>>, @@ -35,6 +41,11 @@ where let mut buf = BytesMut::with_capacity(super::consts::BUFFER_SIZE); futures_util::pin_mut!(resp_body); + let (enable_compress, mut uncompression_buf) = match compression_encoding { + Some(CompressionEncoding::Gzip) => (true, BytesMut::with_capacity(super::consts::BUFFER_SIZE)), + None => (false, BytesMut::new()) + }; + loop { match resp_body.next().await { Some(Ok(item)) => { @@ -43,7 +54,18 @@ where unsafe { buf.advance_mut(super::consts::HEADER_SIZE); } - encoder.encode(item, &mut EncodeBuf::new(&mut buf)).map_err(|_e| tonic::Status::internal("encode error")); + + if enable_compress { + uncompression_buf.clear(); + + encoder.encode(item, &mut EncodeBuf::new(&mut uncompression_buf)).map_err(|_e| tonic::Status::internal("encode error")); + + let len = uncompression_buf.len(); + compress(compression_encoding.unwrap(), &mut uncompression_buf, &mut buf, len).map_err(|_| tonic::Status::internal("compress error")); + } else { + encoder.encode(item, &mut EncodeBuf::new(&mut buf)).map_err(|_e| tonic::Status::internal("encode error")); + } + let len = buf.len() - super::consts::HEADER_SIZE; { @@ -64,24 +86,26 @@ where pub fn encode_server<E, B>( encoder: E, body: B, + compression_encoding: Option<CompressionEncoding>, ) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>> where E: Encoder<Error = Status>, B: Stream<Item = Result<E::Item, Status>>, { - let s = encode(encoder, body).into_stream(); + let s = encode(encoder, body, compression_encoding).into_stream(); EncodeBody::new_server(s) } pub fn encode_client<E, B>( encoder: E, body: B, + compression_encoding: Option<CompressionEncoding>, ) -> EncodeBody<impl Stream<Item = Result<Bytes, Status>>> where E: Encoder<Error = Status>, B: Stream<Item = E::Item>, { - let s = encode(encoder, body.map(Ok)).into_stream(); + let s = encode(encoder, body.map(Ok), compression_encoding).into_stream(); EncodeBody::new_client(s) } diff --git a/triple/src/server/mod.rs b/triple/src/server/mod.rs index 8be7239..6fd5d45 100644 --- a/triple/src/server/mod.rs +++ b/triple/src/server/mod.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +pub mod compression; pub mod consts; pub mod decode; pub mod encode; diff --git a/triple/src/server/server.rs b/triple/src/server/server.rs index 859f545..4512ddb 100644 --- a/triple/src/server/server.rs +++ b/triple/src/server/server.rs @@ -26,6 +26,9 @@ use crate::server::Streaming; use crate::BoxBody; use config::BusinessConfig; +pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding"; +pub const GRPC_ENCODING: &str = "grpc-encoding"; + pub struct TripleServer<T> { codec: T, } @@ -51,12 +54,30 @@ where B: Body + Send + 'static, B::Error: Into<crate::Error> + Send, { - let req_stream = req.map(|body| Streaming::new(body, self.codec.decoder())); + let encoding = req.headers().get(GRPC_ENCODING).unwrap().to_str().unwrap(); + let compression = match super::consts::COMPRESSIONS.get(encoding) { + Some(val) => val.to_owned(), + None => { + let mut status = tonic::Status::unimplemented(format!( + "grpc-accept-encoding: {} not support!", + encoding + )); + + status.metadata_mut().insert( + GRPC_ACCEPT_ENCODING, + tonic::metadata::MetadataValue::from_static("gzip,identity"), + ); + + return status.to_http(); + } + }; + + let req_stream = req.map(|body| Streaming::new(body, self.codec.decoder(), compression)); let resp = service.call(Request::from_http(req_stream)).await; let (mut parts, resp_body) = resp.unwrap().into_http().into_parts(); - let resp_body = encode_server(self.codec.encoder(), resp_body); + let resp_body = encode_server(self.codec.encoder(), resp_body, compression); parts.headers.insert( http::header::CONTENT_TYPE, @@ -76,7 +97,25 @@ where B: Body + Send + 'static, B::Error: Into<crate::Error> + Send, { - let req_stream = req.map(|body| Streaming::new(body, self.codec.decoder())); + let encoding = req.headers().get(GRPC_ENCODING).unwrap().to_str().unwrap(); + let compression = match super::consts::COMPRESSIONS.get(encoding) { + Some(val) => val.to_owned(), + None => { + let mut status = tonic::Status::unimplemented(format!( + "grpc-accept-encoding: {} not support!", + encoding + )); + + status.metadata_mut().insert( + GRPC_ACCEPT_ENCODING, + tonic::metadata::MetadataValue::from_static("gzip,identity"), + ); + + return status.to_http(); + } + }; + + let req_stream = req.map(|body| Streaming::new(body, self.codec.decoder(), compression)); let (parts, mut body) = Request::from_http(req_stream).into_parts(); let msg = body .try_next() @@ -90,12 +129,16 @@ where let resp_body = encode_server( self.codec.encoder(), stream::once(future::ready(resp_body)).map(Ok).into_stream(), + compression, ); parts.headers.insert( http::header::CONTENT_TYPE, http::HeaderValue::from_static("application/grpc"), ); + parts + .headers + .insert(GRPC_ENCODING, http::HeaderValue::from_static("gzip")); parts.status = http::StatusCode::OK; http::Response::from_parts(parts, BoxBody::new(resp_body)) } diff --git a/triple/src/transport/mod.rs b/triple/src/transport/mod.rs index 1f772a6..296af45 100644 --- a/triple/src/transport/mod.rs +++ b/triple/src/transport/mod.rs @@ -15,6 +15,7 @@ * limitations under the License. */ +pub mod router; pub mod service; pub use service::DubboServer; diff --git a/triple/src/transport/router.rs b/triple/src/transport/router.rs new file mode 100644 index 0000000..a64306b --- /dev/null +++ b/triple/src/transport/router.rs @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::fmt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use axum::Router; +use futures_core::Future; +use hyper::{Body, Request, Response}; +use pin_project::pin_project; +use tower::ServiceExt; +use tower_service::Service; + +use crate::BoxBody; + +#[derive(Debug, Clone, Default)] +pub struct DubboRouter { + pub router: Router, +} + +impl DubboRouter { + pub fn new() -> DubboRouter { + Self { + router: Router::new(), + } + } +} + +impl DubboRouter { + pub fn add_service<S>(mut self, name: String, service: S) -> Self + where + S: Service<Request<Body>, Response = Response<BoxBody>, Error = std::convert::Infallible> + + Clone + + Send + + 'static, + S::Future: Send + 'static, + { + let svc = service.map_response(|res| res.map(axum::body::boxed)); + // *{bubbo} represents wildcard router + self.router = self.router.route(&format!("/{}/*dubbo", name), svc); + + self + } +} + +impl Service<Request<Body>> for DubboRouter { + type Response = Response<BoxBody>; + type Error = crate::Error; + type Future = RoutesFuture; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request<Body>) -> Self::Future { + RoutesFuture(self.router.call(req)) + } +} + +#[pin_project] +pub struct RoutesFuture(#[pin] axum::routing::future::RouteFuture<Body, std::convert::Infallible>); + +impl fmt::Debug for RoutesFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("RoutesFuture").finish() + } +} + +impl Future for RoutesFuture { + type Output = Result<Response<BoxBody>, crate::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match futures_util::ready!(self.project().0.poll(cx)) { + Ok(res) => Ok(res.map(crate::boxed)).into(), + Err(err) => match err {}, + } + } +} diff --git a/triple/src/transport/service.rs b/triple/src/transport/service.rs index fdd6ecb..c8a5b67 100644 --- a/triple/src/transport/service.rs +++ b/triple/src/transport/service.rs @@ -15,7 +15,6 @@ * limitations under the License. */ -use std::collections::HashMap; use std::future; use std::net::SocketAddr; use std::task::Poll; @@ -26,12 +25,12 @@ use tokio::time::Duration; use tower::ServiceExt; use tower_service::Service; +use super::router::DubboRouter; use crate::BoxBody; use config::BusinessConfig; use config::Config; type BoxService = tower::util::BoxService<Request<Body>, Response<BoxBody>, crate::Error>; -type BoxCloneService = tower::util::BoxCloneService<Request<Body>, Response<BoxBody>, crate::Error>; #[derive(Default, Clone)] pub struct DubboServer { @@ -42,7 +41,7 @@ pub struct DubboServer { max_frame_size: Option<u32>, http2_keepalive_interval: Option<Duration>, http2_keepalive_timeout: Option<Duration>, - services: HashMap<String, BoxCloneService>, + router: DubboRouter, } impl DubboServer { @@ -103,7 +102,7 @@ impl DubboServer { http2_keepalive_interval: None, http2_keepalive_timeout: None, max_frame_size: None, - services: HashMap::new(), + router: DubboRouter::new(), } } } @@ -111,19 +110,19 @@ impl DubboServer { impl DubboServer { pub fn add_service<S>(mut self, name: String, service: S) -> Self where - S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static, + S: Service<Request<Body>, Response = Response<BoxBody>, Error = std::convert::Infallible> + + Clone + + Send + + 'static, S::Future: Send + 'static, S::Error: Into<crate::Error> + Send + 'static, { - self.services - .insert(name, service.map_err(|err| err.into()).boxed_clone()); - Self { ..self } + self.router = self.router.add_service(name, service); + self } - pub async fn serve(self, name: String, addr: SocketAddr) -> Result<(), crate::Error> { - let svc = MakeSvc { - inner: self.services.get(&name).unwrap().clone(), - }; + pub async fn serve(self, addr: SocketAddr) -> Result<(), crate::Error> { + let svc = MakeSvc { inner: self.router }; let http2_keepalive_timeout = self .http2_keepalive_timeout
