This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit 3f4879db08c180f8fc5e86b5b81959792808cc95
Author: yangyang <[email protected]>
AuthorDate: Sat Jul 30 12:03:06 2022 +0800

    refator(triple): support compression option in server and client init 
process
---
 triple/src/client/grpc.rs        |  4 +-
 triple/src/server/compression.rs | 44 +++++++++++++++++-
 triple/src/server/consts.rs      | 15 -------
 triple/src/server/decode.rs      |  3 +-
 triple/src/server/encode.rs      |  5 +--
 triple/src/server/server.rs      | 97 +++++++++++++++++++++++++---------------
 6 files changed, 109 insertions(+), 59 deletions(-)

diff --git a/triple/src/client/grpc.rs b/triple/src/client/grpc.rs
index 32ebb8d..8669672 100644
--- a/triple/src/client/grpc.rs
+++ b/triple/src/client/grpc.rs
@@ -22,7 +22,7 @@ use http::HeaderValue;
 
 use crate::codec::Codec;
 use crate::invocation::{IntoStreamingRequest, Request, Response};
-use crate::server::consts::CompressionEncoding;
+use crate::server::compression::CompressionEncoding;
 use crate::server::encode::encode;
 use crate::server::Streaming;
 
@@ -122,6 +122,8 @@ impl TripleClient {
             req.headers_mut()
                 .insert("grpc-encoding", 
http::HeaderValue::from_static("gzip"));
         }
+        req.headers_mut()
+                .insert("grpc-accept-encoding", 
http::HeaderValue::from_static("gzip"));
 
         // const (
         //     TripleContentType    = "application/grpc+proto"
diff --git a/triple/src/server/compression.rs b/triple/src/server/compression.rs
index 77f66a9..268f039 100644
--- a/triple/src/server/compression.rs
+++ b/triple/src/server/compression.rs
@@ -15,11 +15,53 @@
  * limitations under the License.
  */
 
+use std::collections::HashMap;
+
 use bytes::{Buf, BufMut, BytesMut};
 use flate2::read::{GzDecoder, GzEncoder};
 use flate2::Compression;
+use lazy_static::lazy_static;
+
+pub const GRPC_ACCEPT_ENCODING: &str = "grpc-accept-encoding";
+pub const GRPC_ENCODING: &str = "grpc-encoding";
+
+#[derive(Debug, Clone, Copy)]
+pub enum CompressionEncoding {
+    Gzip,
+}
+
+lazy_static! {
+    pub static ref COMPRESSIONS: HashMap<String, Option<CompressionEncoding>> 
= {
+        let mut v = HashMap::new();
+        v.insert("gzip".to_string(), Some(CompressionEncoding::Gzip));
+        v
+    };
+}
 
-use super::consts::CompressionEncoding;
+impl CompressionEncoding {
+    pub fn from_accept_encoding(header: &http::HeaderMap) -> 
Option<CompressionEncoding> {
+        let accept_encoding = header.get(GRPC_ACCEPT_ENCODING)?;
+        let encodings = accept_encoding.to_str().ok()?;
+
+        encodings
+            .trim()
+            .split(',')
+            .map(|s| s.trim())
+            .into_iter()
+            .find_map(|s| {
+                match s {
+                    "gzip" => Some(CompressionEncoding::Gzip),
+                    _ => None,
+                }
+            })
+    }
+
+    pub fn into_header_value(self) -> http::HeaderValue {
+        match self {
+            CompressionEncoding::Gzip => 
http::HeaderValue::from_static("gzip"),
+        }
+    }
+}
 
 pub fn compress(
     encoding: CompressionEncoding,
diff --git a/triple/src/server/consts.rs b/triple/src/server/consts.rs
index 0e0f0fa..14de7fa 100644
--- a/triple/src/server/consts.rs
+++ b/triple/src/server/consts.rs
@@ -23,18 +23,3 @@ pub const HEADER_SIZE: usize =
     // data length
     std::mem::size_of::<u32>();
 
-#[derive(Debug, Clone, Copy)]
-pub enum CompressionEncoding {
-    Gzip,
-}
-
-use lazy_static::lazy_static;
-use std::collections::HashMap;
-
-lazy_static! {
-    pub static ref COMPRESSIONS: HashMap<String, Option<CompressionEncoding>> 
= {
-        let mut v = HashMap::new();
-        v.insert("gzip".to_string(), Some(CompressionEncoding::Gzip));
-        v
-    };
-}
diff --git a/triple/src/server/decode.rs b/triple/src/server/decode.rs
index 321a4e6..f63486c 100644
--- a/triple/src/server/decode.rs
+++ b/triple/src/server/decode.rs
@@ -23,8 +23,7 @@ use futures_util::{future, ready};
 use http_body::Body;
 use tonic::metadata::MetadataMap;
 
-use super::compression::decompress;
-use super::consts::CompressionEncoding;
+use super::compression::{decompress, CompressionEncoding};
 use crate::codec::{DecodeBuf, Decoder};
 
 type BoxBody = http_body::combinators::UnsyncBoxBody<Bytes, tonic::Status>;
diff --git a/triple/src/server/encode.rs b/triple/src/server/encode.rs
index dfa4ae7..3ecdefd 100644
--- a/triple/src/server/encode.rs
+++ b/triple/src/server/encode.rs
@@ -23,8 +23,7 @@ use http_body::Body;
 use pin_project::pin_project;
 use tonic::Status;
 
-use super::compression::compress;
-use super::consts::CompressionEncoding;
+use super::compression::{compress, CompressionEncoding};
 use crate::codec::{EncodeBuf, Encoder};
 
 #[allow(unused_must_use)]
@@ -70,7 +69,7 @@ where
                     let len = buf.len() - super::consts::HEADER_SIZE;
                     {
                         let mut buf = &mut buf[..super::consts::HEADER_SIZE];
-                        buf.put_u8(1);
+                        buf.put_u8(enable_compress as u8);
                         buf.put_u32(len as u32);
                     }
 
diff --git a/triple/src/server/server.rs b/triple/src/server/server.rs
index 4512ddb..150e7f1 100644
--- a/triple/src/server/server.rs
+++ b/triple/src/server/server.rs
@@ -18,6 +18,7 @@
 use futures_util::{future, stream, StreamExt, TryStreamExt};
 use http_body::Body;
 
+use super::compression::{CompressionEncoding, COMPRESSIONS};
 use crate::codec::Codec;
 use crate::invocation::Request;
 use crate::server::encode::encode_server;
@@ -31,11 +32,15 @@ pub const GRPC_ENCODING: &str = "grpc-encoding";
 
 pub struct TripleServer<T> {
     codec: T,
+    compression: Option<CompressionEncoding>,
 }
 
 impl<T> TripleServer<T> {
     pub fn new(codec: T) -> Self {
-        Self { codec }
+        Self {
+            codec,
+            compression: None,
+        }
     }
 }
 
@@ -54,22 +59,17 @@ where
         B: Body + Send + 'static,
         B::Error: Into<crate::Error> + Send,
     {
-        let encoding = 
req.headers().get(GRPC_ENCODING).unwrap().to_str().unwrap();
-        let compression = match super::consts::COMPRESSIONS.get(encoding) {
-            Some(val) => val.to_owned(),
-            None => {
-                let mut status = tonic::Status::unimplemented(format!(
-                    "grpc-accept-encoding: {} not support!",
-                    encoding
-                ));
-
-                status.metadata_mut().insert(
-                    GRPC_ACCEPT_ENCODING,
-                    
tonic::metadata::MetadataValue::from_static("gzip,identity"),
-                );
-
-                return status.to_http();
-            }
+        // Firstly, get grpc_accept_encoding from http_header, get compression
+        // Secondly, if server enable compression and compression is valid, 
this method should compress response
+        let mut accept_encoding = 
CompressionEncoding::from_accept_encoding(req.headers());
+        if self.compression.is_none() || accept_encoding.is_none() {
+            accept_encoding = None;
+        }
+
+        // Get grpc_encoding from http_header, decompress message.
+        let compression = match self.get_encoding_from_req(req.headers()) {
+            Ok(val) => val,
+            Err(status) => return status.to_http(),
         };
 
         let req_stream = req.map(|body| Streaming::new(body, 
self.codec.decoder(), compression));
@@ -83,6 +83,11 @@ where
             http::header::CONTENT_TYPE,
             http::HeaderValue::from_static("application/grpc"),
         );
+        if let Some(encoding) = accept_encoding {
+            parts
+                .headers
+                .insert(GRPC_ENCODING, encoding.into_header_value());
+        }
         parts.status = http::StatusCode::OK;
         http::Response::from_parts(parts, BoxBody::new(resp_body))
     }
@@ -97,22 +102,14 @@ where
         B: Body + Send + 'static,
         B::Error: Into<crate::Error> + Send,
     {
-        let encoding = 
req.headers().get(GRPC_ENCODING).unwrap().to_str().unwrap();
-        let compression = match super::consts::COMPRESSIONS.get(encoding) {
-            Some(val) => val.to_owned(),
-            None => {
-                let mut status = tonic::Status::unimplemented(format!(
-                    "grpc-accept-encoding: {} not support!",
-                    encoding
-                ));
-
-                status.metadata_mut().insert(
-                    GRPC_ACCEPT_ENCODING,
-                    
tonic::metadata::MetadataValue::from_static("gzip,identity"),
-                );
-
-                return status.to_http();
-            }
+        let mut accept_encoding = 
CompressionEncoding::from_accept_encoding(req.headers());
+        if self.compression.is_none() || accept_encoding.is_none() {
+            accept_encoding = None;
+        }
+
+        let compression = match self.get_encoding_from_req(req.headers()) {
+            Ok(val) => val,
+            Err(status) => return status.to_http(),
         };
 
         let req_stream = req.map(|body| Streaming::new(body, 
self.codec.decoder(), compression));
@@ -129,19 +126,45 @@ where
         let resp_body = encode_server(
             self.codec.encoder(),
             stream::once(future::ready(resp_body)).map(Ok).into_stream(),
-            compression,
+            accept_encoding,
         );
 
         parts.headers.insert(
             http::header::CONTENT_TYPE,
             http::HeaderValue::from_static("application/grpc"),
         );
-        parts
-            .headers
-            .insert(GRPC_ENCODING, http::HeaderValue::from_static("gzip"));
+        if let Some(encoding) = accept_encoding {
+            parts
+                .headers
+                .insert(GRPC_ENCODING, encoding.into_header_value());
+        }
         parts.status = http::StatusCode::OK;
         http::Response::from_parts(parts, BoxBody::new(resp_body))
     }
+
+    fn get_encoding_from_req(
+        &self,
+        header: &http::HeaderMap,
+    ) -> Result<Option<CompressionEncoding>, tonic::Status> {
+        let encoding = header.get(GRPC_ENCODING).unwrap().to_str().unwrap();
+        let compression = match COMPRESSIONS.get(encoding) {
+            Some(val) => val.to_owned(),
+            None => {
+                let mut status = tonic::Status::unimplemented(format!(
+                    "grpc-accept-encoding: {} not support!",
+                    encoding
+                ));
+
+                status.metadata_mut().insert(
+                    GRPC_ACCEPT_ENCODING,
+                    
tonic::metadata::MetadataValue::from_static("gzip,identity"),
+                );
+
+                return Err(status);
+            }
+        };
+        Ok(compression)
+    }
 }
 
 impl<T> BusinessConfig for TripleServer<T> {

Reply via email to