This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 3f4879db08c180f8fc5e86b5b81959792808cc95 Author: yangyang <[email protected]> AuthorDate: Sat Jul 30 12:03:06 2022 +0800 refator(triple): support compression option in server and client init process --- triple/src/client/grpc.rs | 4 +- triple/src/server/compression.rs | 44 +++++++++++++++++- triple/src/server/consts.rs | 15 ------- triple/src/server/decode.rs | 3 +- triple/src/server/encode.rs | 5 +-- triple/src/server/server.rs | 97 +++++++++++++++++++++++++--------------- 6 files changed, 109 insertions(+), 59 deletions(-) diff --git a/triple/src/client/grpc.rs b/triple/src/client/grpc.rs index 32ebb8d..8669672 100644 --- a/triple/src/client/grpc.rs +++ b/triple/src/client/grpc.rs @@ -22,7 +22,7 @@ use http::HeaderValue; use crate::codec::Codec; use crate::invocation::{IntoStreamingRequest, Request, Response}; -use crate::server::consts::CompressionEncoding; +use crate::server::compression::CompressionEncoding; use crate::server::encode::encode; use crate::server::Streaming; @@ -122,6 +122,8 @@ impl TripleClient { req.headers_mut() .insert("grpc-encoding", http::HeaderValue::from_static("gzip")); } + req.headers_mut() + .insert("grpc-accept-encoding", http::HeaderValue::from_static("gzip")); // const ( // TripleContentType = "application/grpc+proto" diff --git a/triple/src/server/compression.rs b/triple/src/server/compression.rs index 77f66a9..268f039 100644 --- a/triple/src/server/compression.rs +++ b/triple/src/server/compression.rs @@ -15,11 +15,53 @@ * limitations under the License. */ +use std::collections::HashMap; + use bytes::{Buf, BufMut, BytesMut}; use flate2::read::{GzDecoder, GzEncoder}; use flate2::Compression; +use lazy_static::lazy_static; + +pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding"; +pub const GRPC_ENCODING: &str = "grpc-encoding"; + +#[derive(Debug, Clone, Copy)] +pub enum CompressionEncoding { + Gzip, +} + +lazy_static! { + pub static ref COMPRESSIONS: HashMap<String, Option<CompressionEncoding>> = { + let mut v = HashMap::new(); + v.insert("gzip".to_string(), Some(CompressionEncoding::Gzip)); + v + }; +} -use super::consts::CompressionEncoding; +impl CompressionEncoding { + pub fn from_accept_encoding(header: &http::HeaderMap) -> Option<CompressionEncoding> { + let accept_encoding = header.get(GRPC_ACCEPT_ENCODING)?; + let encodings = accept_encoding.to_str().ok()?; + + encodings + .trim() + .split(',') + .map(|s| s.trim()) + .into_iter() + .find_map(|s| { + match s { + "gzip" => Some(CompressionEncoding::Gzip), + _ => None, + } + }) + } + + pub fn into_header_value(self) -> http::HeaderValue { + match self { + CompressionEncoding::Gzip => http::HeaderValue::from_static("gzip"), + } + } +} pub fn compress( encoding: CompressionEncoding, diff --git a/triple/src/server/consts.rs b/triple/src/server/consts.rs index 0e0f0fa..14de7fa 100644 --- a/triple/src/server/consts.rs +++ b/triple/src/server/consts.rs @@ -23,18 +23,3 @@ pub const HEADER_SIZE: usize = // data length std::mem::size_of::<u32>(); -#[derive(Debug, Clone, Copy)] -pub enum CompressionEncoding { - Gzip, -} - -use lazy_static::lazy_static; -use std::collections::HashMap; - -lazy_static! { - pub static ref COMPRESSIONS: HashMap<String, Option<CompressionEncoding>> = { - let mut v = HashMap::new(); - v.insert("gzip".to_string(), Some(CompressionEncoding::Gzip)); - v - }; -} diff --git a/triple/src/server/decode.rs b/triple/src/server/decode.rs index 321a4e6..f63486c 100644 --- a/triple/src/server/decode.rs +++ b/triple/src/server/decode.rs @@ -23,8 +23,7 @@ use futures_util::{future, ready}; use http_body::Body; use tonic::metadata::MetadataMap; -use super::compression::decompress; -use super::consts::CompressionEncoding; +use super::compression::{decompress, CompressionEncoding}; use crate::codec::{DecodeBuf, Decoder}; type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, tonic::Status>; diff --git a/triple/src/server/encode.rs b/triple/src/server/encode.rs index dfa4ae7..3ecdefd 100644 --- a/triple/src/server/encode.rs +++ b/triple/src/server/encode.rs @@ -23,8 +23,7 @@ use http_body::Body; use pin_project::pin_project; use tonic::Status; -use super::compression::compress; -use super::consts::CompressionEncoding; +use super::compression::{compress, CompressionEncoding}; use crate::codec::{EncodeBuf, Encoder}; #[allow(unused_must_use)] @@ -70,7 +69,7 @@ where let len = buf.len() - super::consts::HEADER_SIZE; { let mut buf = &mut buf[..super::consts::HEADER_SIZE]; - buf.put_u8(1); + buf.put_u8(enable_compress as u8); buf.put_u32(len as u32); } diff --git a/triple/src/server/server.rs b/triple/src/server/server.rs index 4512ddb..150e7f1 100644 --- a/triple/src/server/server.rs +++ b/triple/src/server/server.rs @@ -18,6 +18,7 @@ use futures_util::{future, stream, StreamExt, TryStreamExt}; use http_body::Body; +use super::compression::{CompressionEncoding, COMPRESSIONS}; use crate::codec::Codec; use crate::invocation::Request; use crate::server::encode::encode_server; @@ -31,11 +32,15 @@ pub const GRPC_ENCODING: &str = "grpc-encoding"; pub struct TripleServer<T> { codec: T, + compression: Option<CompressionEncoding>, } impl<T> TripleServer<T> { pub fn new(codec: T) -> Self { - Self { codec } + Self { + codec, + compression: None, + } } } @@ -54,22 +59,17 @@ where B: Body + Send + 'static, B::Error: Into<crate::Error> + Send, { - let encoding = req.headers().get(GRPC_ENCODING).unwrap().to_str().unwrap(); - let compression = match super::consts::COMPRESSIONS.get(encoding) { - Some(val) => val.to_owned(), - None => { - let mut status = tonic::Status::unimplemented(format!( - "grpc-accept-encoding: {} not support!", - encoding - )); - - status.metadata_mut().insert( - GRPC_ACCEPT_ENCODING, - tonic::metadata::MetadataValue::from_static("gzip,identity"), - ); - - return status.to_http(); - } + // Firstly, get grpc_accept_encoding from http_header, get compression + // Secondly, if server enable compression and compression is valid, this method should compress response + let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers()); + if self.compression.is_none() || accept_encoding.is_none() { + accept_encoding = None; + } + + // Get grpc_encoding from http_header, decompress message. + let compression = match self.get_encoding_from_req(req.headers()) { + Ok(val) => val, + Err(status) => return status.to_http(), }; let req_stream = req.map(|body| Streaming::new(body, self.codec.decoder(), compression)); @@ -83,6 +83,11 @@ where http::header::CONTENT_TYPE, http::HeaderValue::from_static("application/grpc"), ); + if let Some(encoding) = accept_encoding { + parts + .headers + .insert(GRPC_ENCODING, encoding.into_header_value()); + } parts.status = http::StatusCode::OK; http::Response::from_parts(parts, BoxBody::new(resp_body)) } @@ -97,22 +102,14 @@ where B: Body + Send + 'static, B::Error: Into<crate::Error> + Send, { - let encoding = req.headers().get(GRPC_ENCODING).unwrap().to_str().unwrap(); - let compression = match super::consts::COMPRESSIONS.get(encoding) { - Some(val) => val.to_owned(), - None => { - let mut status = tonic::Status::unimplemented(format!( - "grpc-accept-encoding: {} not support!", - encoding - )); - - status.metadata_mut().insert( - GRPC_ACCEPT_ENCODING, - tonic::metadata::MetadataValue::from_static("gzip,identity"), - ); - - return status.to_http(); - } + let mut accept_encoding = CompressionEncoding::from_accept_encoding(req.headers()); + if self.compression.is_none() || accept_encoding.is_none() { + accept_encoding = None; + } + + let compression = match self.get_encoding_from_req(req.headers()) { + Ok(val) => val, + Err(status) => return status.to_http(), }; let req_stream = req.map(|body| Streaming::new(body, self.codec.decoder(), compression)); @@ -129,19 +126,45 @@ where let resp_body = encode_server( self.codec.encoder(), stream::once(future::ready(resp_body)).map(Ok).into_stream(), - compression, + accept_encoding, ); parts.headers.insert( http::header::CONTENT_TYPE, http::HeaderValue::from_static("application/grpc"), ); - parts - .headers - .insert(GRPC_ENCODING, http::HeaderValue::from_static("gzip")); + if let Some(encoding) = accept_encoding { + parts + .headers + .insert(GRPC_ENCODING, encoding.into_header_value()); + } parts.status = http::StatusCode::OK; http::Response::from_parts(parts, BoxBody::new(resp_body)) } + + fn get_encoding_from_req( + &self, + header: &http::HeaderMap, + ) -> Result<Option<CompressionEncoding>, tonic::Status> { + let encoding = header.get(GRPC_ENCODING).unwrap().to_str().unwrap(); + let compression = match COMPRESSIONS.get(encoding) { + Some(val) => val.to_owned(), + None => { + let mut status = tonic::Status::unimplemented(format!( + "grpc-accept-encoding: {} not support!", + encoding + )); + + status.metadata_mut().insert( + GRPC_ACCEPT_ENCODING, + tonic::metadata::MetadataValue::from_static("gzip,identity"), + ); + + return Err(status); + } + }; + Ok(compression) + } } impl<T> BusinessConfig for TripleServer<T> {
