This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch poc-idl
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit 13d77d437ac523915cb3c9da45850b025536709e
Author: johankoi <[email protected]>
AuthorDate: Tue Jun 14 01:37:53 2022 +0800

    feat: client server message
    
    Signed-off-by: johankoi <[email protected]>
---
 Cargo.toml                                   |   2 +
 {xds => examples/protobuf/client}/Cargo.toml |   5 +-
 examples/protobuf/client/src/main.rs         |  17 +
 {xds => examples/protobuf/server}/Cargo.toml |   4 +-
 examples/protobuf/server/src/main.rs         |  11 +
 xds/Cargo.toml                               |  19 ++
 xds/src/client/client.rs                     |  59 ++++
 xds/src/client/mod.rs                        |   4 +
 xds/src/lib.rs                               |  16 +-
 xds/src/protocol/error.rs                    | 189 +++++++++++
 xds/src/protocol/message.rs                  | 468 +++++++++++++++++++++++++++
 xds/src/protocol/mod.rs                      |   6 +
 xds/src/server/mod.rs                        |   4 +
 xds/src/server/server.rs                     |  76 +++++
 14 files changed, 876 insertions(+), 4 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 3720b7d..f73b8c4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,4 +6,6 @@ members = [
   "metadata",
   "common",
   "config",
+  "examples/protobuf/client",
+  "examples/protobuf/server",
 ]
\ No newline at end of file
diff --git a/xds/Cargo.toml b/examples/protobuf/client/Cargo.toml
similarity index 57%
copy from xds/Cargo.toml
copy to examples/protobuf/client/Cargo.toml
index 75e2515..b0bbc6e 100644
--- a/xds/Cargo.toml
+++ b/examples/protobuf/client/Cargo.toml
@@ -1,8 +1,11 @@
 [package]
-name = "xds"
+name = "protobuf"
 version = "0.1.0"
 edition = "2021"
 
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+xds = { version = "0.1.0", path = "../../../xds" }
+tokio = {version = "1.9.0", features = ["full"]}
+
diff --git a/examples/protobuf/client/src/main.rs 
b/examples/protobuf/client/src/main.rs
new file mode 100644
index 0000000..0ae5d6a
--- /dev/null
+++ b/examples/protobuf/client/src/main.rs
@@ -0,0 +1,17 @@
+use std::collections::hash_map::HashMap;
+use xds::{ client::RpcClient };
+
+#[tokio::main]
+async fn main() {
+    let task = tokio::spawn(async move {
+        let mut client = RpcClient::new(String::from("http://127.0.0.1:8972";));
+
+        println!("client call begin");
+        let service_path = String::from("helloworld");
+        let service_method = String::from("hello");
+        let metadata = HashMap::new();
+        client.call(service_path, service_method, &metadata).await;
+
+    });
+    task.await.unwrap();
+}
\ No newline at end of file
diff --git a/xds/Cargo.toml b/examples/protobuf/server/Cargo.toml
similarity index 58%
copy from xds/Cargo.toml
copy to examples/protobuf/server/Cargo.toml
index 75e2515..fe077e4 100644
--- a/xds/Cargo.toml
+++ b/examples/protobuf/server/Cargo.toml
@@ -1,8 +1,10 @@
 [package]
-name = "xds"
+name = "server"
 version = "0.1.0"
 edition = "2021"
 
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+xds = { version = "0.1.0", path = "../../../xds" }
+tokio = {version = "1.9.0", features = ["full"]}
\ No newline at end of file
diff --git a/examples/protobuf/server/src/main.rs 
b/examples/protobuf/server/src/main.rs
new file mode 100644
index 0000000..a182144
--- /dev/null
+++ b/examples/protobuf/server/src/main.rs
@@ -0,0 +1,11 @@
+use std::net::SocketAddr;
+use xds::{ server::RpcServer };
+
+
+#[tokio::main]
+async fn main() {
+    let addr = SocketAddr::from(([127, 0, 0, 1], 8972));
+    let mut server = RpcServer::new(addr);
+    server.start().await;
+    println!("RpcServer ok");
+}
\ No newline at end of file
diff --git a/xds/Cargo.toml b/xds/Cargo.toml
index 75e2515..49af168 100644
--- a/xds/Cargo.toml
+++ b/xds/Cargo.toml
@@ -6,3 +6,22 @@ edition = "2021"
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+hyper = { version = "0.14", features = ["full"] }
+tokio = {version = "1.9.0", features = ["full"]}
+futures = "0.3.16"
+
+# protocol
+byteorder = "1.3.2"
+strum = "0.21.0"
+strum_macros = "0.21.1"
+num-traits = "0.2.8"
+enum-primitive-derive = "0.1.2"
+serde = { version = "1.0.126",features = ["derive"]}
+serde_json = "1.0.40"
+bytes = "1.0.1"
+flate2 = "1.0"
+
+# server
+tower = "0.4"
+log = "0.4"
+env_logger = "0.9.0"
diff --git a/xds/src/client/client.rs b/xds/src/client/client.rs
index 2944f98..4cad8f3 100644
--- a/xds/src/client/client.rs
+++ b/xds/src/client/client.rs
@@ -14,3 +14,62 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+use hyper::client::conn::Builder;
+use hyper::client::connect::HttpConnector;
+use hyper::client::service::Connect;
+use hyper::service::Service;
+use hyper::{Body, Request};
+
+use crate::protocol::message::*;
+use std::collections::HashMap;
+
+
+pub struct RpcClient {
+    addr: String
+}
+
+impl RpcClient {
+    pub fn new(addr: String) -> RpcClient {
+        RpcClient {
+            addr
+        }
+    }
+
+    pub async fn call(
+        &mut self,
+        service_path: String,
+        service_method: String,
+        metadata: &Metadata
+    ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>
+    {
+        let mut req = Message::new();
+        req.set_version(0);
+        req.set_message_type(MessageType::Request);
+        req.set_serialize_type(SerializeType::Protobuf);
+        req.set_compress_type(CompressType::Gzip);
+        req.service_path = service_path;
+        req.service_method = service_method;
+
+        let mut new_metadata = HashMap::with_capacity(metadata.len());
+        for (k, v) in metadata {
+            new_metadata.insert(k.clone(), v.clone());
+        }
+        req.metadata.replace(new_metadata);
+        let body_data = req.encode();
+
+        // println!("call, body_data={:?}", body_data);
+
+        let mut mk_svc = Connect::new(HttpConnector::new(), Builder::new());
+        let uri = self.addr.parse::<hyper::Uri>().unwrap();
+        let mut svc = mk_svc.call(uri.clone()).await?;
+
+        let body = Body::from(body_data);
+        let req = Request::get(uri.clone()).body(body)?;
+        let res = svc.call(req).await?;
+        println!("RESPONSE={:?}", res.body());
+
+        Ok(())
+    }
+}
+
diff --git a/xds/src/client/mod.rs b/xds/src/client/mod.rs
index 2944f98..d8d317f 100644
--- a/xds/src/client/mod.rs
+++ b/xds/src/client/mod.rs
@@ -14,3 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+pub(crate) mod client;
+pub use self::client::*;
+
diff --git a/xds/src/lib.rs b/xds/src/lib.rs
index 3e01853..736d111 100644
--- a/xds/src/lib.rs
+++ b/xds/src/lib.rs
@@ -15,11 +15,23 @@
  * limitations under the License.
  */
 
+pub mod client;
+pub mod server;
+mod protocol;
+
 #[cfg(test)]
 mod tests {
+    use crate::client::client::RpcClient;
+    use crate::server::server::RpcServer;
+    use std::net::SocketAddr;
     #[test]
     fn it_works() {
-        let result = 2 + 2;
-        assert_eq!(result, 4);
+        // RpcClient
+        let client = RpcClient::new(String::from("http://127.0.0.1:8972";));
+
+        // RpcServer
+        let addr = SocketAddr::from(([127, 0, 0, 1], 8972));
+        let server = RpcServer::new(addr);
+        println!("it_works");
     }
 }
diff --git a/xds/src/protocol/error.rs b/xds/src/protocol/error.rs
new file mode 100644
index 0000000..c1cdd8f
--- /dev/null
+++ b/xds/src/protocol/error.rs
@@ -0,0 +1,189 @@
+use std::{convert::From, error, fmt, result, str};
+
+pub type Result<T> = result::Result<T, Error>;
+
+pub struct Error {
+    repr: Repr,
+}
+
+impl fmt::Debug for Error {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        fmt::Debug::fmt(&self.repr, f)
+    }
+}
+
+enum Repr {
+    Other(String),
+    Simple(ErrorKind),
+    Custom(Box<Custom>),
+}
+
+#[derive(Debug)]
+struct Custom {
+    kind: ErrorKind,
+    error: Box<dyn error::Error + Send + Sync>,
+}
+
+#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
+pub enum ErrorKind {
+    Protocol,
+    IO,
+    Client,
+    Network,
+    Server,
+    Serialization,
+    Other,
+}
+
+impl ErrorKind {
+    pub(crate) fn as_str(self) -> &'static str {
+        match self {
+            ErrorKind::Protocol => "invalid protocol",
+            ErrorKind::IO => "io issue",
+            ErrorKind::Client => "client error",
+            ErrorKind::Network => "network issue",
+            ErrorKind::Server => "server error",
+            ErrorKind::Serialization => "serialization failure",
+            ErrorKind::Other => "other",
+        }
+    }
+}
+
+impl From<&'static str> for Error {
+    #[inline]
+    fn from(s: &'static str) -> Error {
+        Error {
+            repr: Repr::Other(s.to_owned()),
+        }
+    }
+}
+
+impl From<String> for Error {
+    #[inline]
+    fn from(s: String) -> Error {
+        Error {
+            repr: Repr::Other(s),
+        }
+    }
+}
+
+impl From<serde_json::error::Error> for Error {
+    #[inline]
+    fn from(err: serde_json::error::Error) -> Error {
+        Error::new(ErrorKind::IO, err)
+    }
+}
+
+impl From<std::io::Error> for Error {
+    #[inline]
+    fn from(err: std::io::Error) -> Error {
+        Error::new(ErrorKind::IO, err)
+    }
+}
+
+impl From<Box<dyn std::error::Error + Send + Sync>> for Error {
+    #[inline]
+    fn from(err: Box<dyn std::error::Error + Send + Sync>) -> Error {
+        Error::_new(ErrorKind::Other, err)
+    }
+}
+impl From<ErrorKind> for Error {
+    #[inline]
+    fn from(kind: ErrorKind) -> Error {
+        Error {
+            repr: Repr::Simple(kind),
+        }
+    }
+}
+
+impl Error {
+    pub fn new<E>(kind: ErrorKind, error: E) -> Error
+    where
+        E: Into<Box<dyn error::Error + Send + Sync>>,
+    {
+        Self::_new(kind, error.into())
+    }
+
+    fn _new(kind: ErrorKind, error: Box<dyn error::Error + Send + Sync>) -> 
Error {
+        Error {
+            repr: Repr::Custom(Box::new(Custom { kind, error })),
+        }
+    }
+
+    pub fn get_ref(&self) -> Option<&(dyn error::Error + Send + Sync + 
'static)> {
+        match self.repr {
+            Repr::Other(..) => None,
+            Repr::Simple(..) => None,
+            Repr::Custom(ref c) => Some(&*c.error),
+        }
+    }
+
+    pub fn get_mut(&mut self) -> Option<&mut (dyn error::Error + Send + Sync + 
'static)> {
+        match self.repr {
+            Repr::Other(..) => None,
+            Repr::Simple(..) => None,
+            Repr::Custom(ref mut c) => Some(&mut *c.error),
+        }
+    }
+
+    pub fn into_inner(self) -> Option<Box<dyn error::Error + Send + Sync>> {
+        match self.repr {
+            Repr::Other(..) => None,
+            Repr::Simple(..) => None,
+            Repr::Custom(c) => Some(c.error),
+        }
+    }
+
+    pub fn kind(&self) -> ErrorKind {
+        match self.repr {
+            Repr::Other(_) => ErrorKind::Other,
+            Repr::Custom(ref c) => c.kind,
+            Repr::Simple(kind) => kind,
+        }
+    }
+}
+
+impl fmt::Debug for Repr {
+    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match &*self {
+            Repr::Other(s) => write!(fmt, "{}", s),
+            Repr::Custom(ref c) => fmt::Debug::fmt(&c, fmt),
+            Repr::Simple(kind) => 
fmt.debug_tuple("Kind").field(&kind).finish(),
+        }
+    }
+}
+
+impl fmt::Display for Error {
+    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match &self.repr {
+            Repr::Other(s) => write!(fmt, "{}", s),
+            Repr::Custom(ref c) => c.error.fmt(fmt),
+            Repr::Simple(kind) => write!(fmt, "{}", kind.as_str()),
+        }
+    }
+}
+
+impl error::Error for Error {
+    fn description(&self) -> &str {
+        match &self.repr {
+            Repr::Other(s) => &s[..],
+            Repr::Simple(..) => self.kind().as_str(),
+            Repr::Custom(ref c) => c.error.description(),
+        }
+    }
+
+    fn cause(&self) -> Option<&dyn error::Error> {
+        match self.repr {
+            Repr::Other(..) => None,
+            Repr::Simple(..) => None,
+            Repr::Custom(ref c) => c.error.source(),
+        }
+    }
+    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
+        match self.repr {
+            Repr::Other(..) => None,
+            Repr::Simple(..) => None,
+            Repr::Custom(ref c) => c.error.source(),
+        }
+    }
+}
diff --git a/xds/src/protocol/message.rs b/xds/src/protocol/message.rs
new file mode 100644
index 0000000..1895172
--- /dev/null
+++ b/xds/src/protocol/message.rs
@@ -0,0 +1,468 @@
+use byteorder::{BigEndian, ByteOrder};
+use enum_primitive_derive::Primitive;
+use flate2::{read::GzDecoder, write::GzEncoder, Compression};
+use num_traits::{FromPrimitive, ToPrimitive};
+use strum_macros::{Display, EnumIter, EnumString};
+
+use std::{
+    cell::RefCell,
+    collections::hash_map::HashMap,
+    io::{Read, Write},
+};
+
+use super::error::*;
+
+
+const MAGIC_NUMBER: u8 = 0x08;
+pub const SERVICE_ERROR: &str = "__rpcx_error__";
+
+#[derive(Debug, Copy, Clone, Display, PartialEq, EnumIter, EnumString, 
Primitive)]
+pub enum MessageType {
+    Request = 0,
+    Response = 1,
+}
+
+#[derive(Debug, Copy, Clone, Display, PartialEq, EnumIter, EnumString, 
Primitive)]
+pub enum MessageStatusType {
+    Normal = 0,
+    Error = 1,
+}
+
+#[derive(Debug, Copy, Clone, Display, PartialEq, EnumIter, EnumString, 
Primitive)]
+pub enum CompressType {
+    CompressNone = 0,
+    Gzip = 1,
+}
+
+#[derive(Debug, Copy, Clone, Display, PartialEq, EnumIter, EnumString, 
Primitive)]
+pub enum SerializeType {
+    SerializeNone = 0,
+    JSON = 1,
+    Protobuf = 2,
+    MsgPack = 3,
+    Thrift = 4,
+}
+
+/// define the rpcx message interface.
+pub trait RpcxMessage {
+    fn check_magic_number(&self) -> bool;
+    fn get_version(&self) -> u8;
+    fn set_version(&mut self, v: u8);
+    fn get_message_type(&self) -> Option<MessageType>;
+    fn set_message_type(&mut self, mt: MessageType);
+    fn is_heartbeat(&self) -> bool;
+    fn set_heartbeat(&mut self, b: bool);
+    fn is_oneway(&self) -> bool;
+    fn set_oneway(&mut self, b: bool);
+    fn get_compress_type(&self) -> Option<CompressType>;
+    fn set_compress_type(&mut self, ct: CompressType);
+    fn get_message_status_type(&self) -> Option<MessageStatusType>;
+    fn set_message_status_type(&mut self, mst: MessageStatusType);
+    fn get_serialize_type(&self) -> Option<SerializeType>;
+    fn set_serialize_type(&mut self, st: SerializeType);
+    fn get_seq(&self) -> u64;
+    fn set_seq(&mut self, seq: u64);
+    fn decode<R: ?Sized>(&mut self, r: &mut R) -> Result<()>
+    where
+        R: Read;
+    fn encode(&self) -> Vec<u8>;
+
+    fn get_error(&self) -> Option<String>;
+}
+
+pub type Metadata = HashMap<String, String>;
+
+/// a commmon struct for request and response.
+#[derive(Debug, Default)]
+pub struct Message {
+    pub header: [u8; 12],
+    pub service_path: String,
+    pub service_method: String,
+    pub metadata: RefCell<Metadata>,
+    pub payload: Vec<u8>,
+}
+impl Message {
+    /// Creates a new `Message`
+    pub fn new() -> Self {
+        let mut msg: Message = Default::default();
+        msg.header = [0u8; 12];
+        msg.header[0] = MAGIC_NUMBER;
+        msg.metadata = RefCell::new(HashMap::new());
+        msg
+    }
+
+    pub fn get_reply(&self) -> Result<Self> {
+        let mut reply = Message::new();
+        reply.set_version(self.get_version());
+        reply.set_compress_type(self.get_compress_type().unwrap());
+        reply.set_message_status_type(MessageStatusType::Normal);
+        reply.set_message_type(MessageType::Response);
+        reply.set_serialize_type(self.get_serialize_type().unwrap());
+        reply.set_seq(self.get_seq());
+        reply.service_path = self.service_path.clone();
+        reply.service_method = self.service_method.clone();
+
+        Ok(reply)
+    }
+}
+
+impl RpcxMessage for Message {
+    fn check_magic_number(&self) -> bool {
+        self.header[0] == MAGIC_NUMBER
+    }
+
+    fn get_version(&self) -> u8 {
+        self.header[1]
+    }
+    fn set_version(&mut self, v: u8) {
+        self.header[1] = v;
+    }
+
+    fn get_message_type(&self) -> Option<MessageType> {
+        MessageType::from_u8((self.header[2] & 0x80) >> 7 as u8)
+    }
+    fn set_message_type(&mut self, mt: MessageType) {
+        self.header[2] |= mt.to_u8().unwrap() << 7;
+    }
+    fn is_heartbeat(&self) -> bool {
+        self.header[2] & 0x40 == 0x40
+    }
+    fn set_heartbeat(&mut self, b: bool) {
+        if b {
+            self.header[2] |= 0x40;
+        } else {
+            self.header[2] &= !0x40;
+        }
+    }
+    fn is_oneway(&self) -> bool {
+        self.header[2] & 0x20 == 0x20
+    }
+    fn set_oneway(&mut self, b: bool) {
+        if b {
+            self.header[2] |= 0x20;
+        } else {
+            self.header[2] &= !0x20;
+        }
+    }
+    fn get_compress_type(&self) -> Option<CompressType> {
+        CompressType::from_u8((self.header[2] & 0x1C) >> 2)
+    }
+    fn set_compress_type(&mut self, ct: CompressType) {
+        self.header[2] = (self.header[2] & !0x1C) | (ct.to_u8().unwrap() << 2 
& 0x1C);
+    }
+    fn get_message_status_type(&self) -> Option<MessageStatusType> {
+        MessageStatusType::from_u8(self.header[2] & 0x03)
+    }
+    fn set_message_status_type(&mut self, mst: MessageStatusType) {
+        self.header[2] = (self.header[2] & !0x03) | (mst.to_u8().unwrap() & 
0x03);
+    }
+    fn get_serialize_type(&self) -> Option<SerializeType> {
+        SerializeType::from_u8((self.header[3] & 0xF0) >> 4)
+    }
+    fn set_serialize_type(&mut self, st: SerializeType) {
+        self.header[3] = (self.header[3] & !0xF0) | (st.to_u8().unwrap() << 4)
+    }
+    fn get_seq(&self) -> u64 {
+        u64_from_slice(&(self.header[4..]))
+    }
+    fn set_seq(&mut self, seq: u64) {
+        u64_to_slice(seq, &mut self.header[4..]);
+    }
+
+    fn decode<R: ?Sized>(&mut self, r: &mut R) -> Result<()>
+    where
+        R: Read,
+    {
+        r.read_exact(&mut self.header)?;
+
+        let mut buf = [0u8; 4];
+        r.read(&mut buf[..]).map(|_| {})?;
+        let len = BigEndian::read_u32(&buf); //length of all expect header
+        let mut buf = vec![0u8; len as usize];
+        r.read(&mut buf[..]).map(|_| ())?;
+
+        let mut start = 0;
+        // read service_path
+        let len = read_len(&buf[start..(start + 4)]) as usize;
+        let service_path = read_str(&buf[(start + 4)..(start + 4 + len)])?;
+        self.service_path = service_path;
+        start = start + 4 + len;
+        // read service_method
+        let len = read_len(&buf[start..(start + 4)]) as usize;
+        let service_method = read_str(&buf[(start + 4)..(start + 4 + len)])?;
+        self.service_method = service_method;
+
+        start = start + 4 + len;
+        //metadata
+        let len = read_len(&buf[start..(start + 4)]) as usize;
+        let metadata_bytes = &buf[(start + 4)..(start + 4 + len)];
+        let mut meta_start = 0;
+        while meta_start < len {
+            let sl = read_len(&metadata_bytes[meta_start..(meta_start + 4)]) 
as usize;
+            let key = read_str(&metadata_bytes[(meta_start + 4)..(meta_start + 
4 + sl)])?;
+            meta_start = meta_start + 4 + sl;
+            if meta_start < len {
+                let value_len = 
read_len(&metadata_bytes[meta_start..(meta_start + 4)]) as usize;
+                let value =
+                    read_str(&metadata_bytes[(meta_start + 4)..(meta_start + 4 
+ value_len)])?;
+                self.metadata.borrow_mut().insert(key, value);
+                meta_start = meta_start + 4 + value_len;
+            } else {
+                self.metadata.borrow_mut().insert(key, String::new());
+                break;
+            }
+        }
+        start = start + 4 + len;
+        // payload
+        let len = read_len(&buf[start..start + 4]) as usize;
+        let payload = &buf[start + 4..];
+        if len != payload.len() {
+            return Err(Error::from("invalid payload length"));
+        }
+
+        let mut vp = Vec::with_capacity(payload.len());
+        match self.get_compress_type().unwrap() {
+            CompressType::Gzip => {
+                let mut deflater = GzDecoder::new(payload);
+                deflater.read_to_end(&mut vp)?;
+            }
+            CompressType::CompressNone => {
+                vp.extend_from_slice(&payload);
+            }
+        }
+        self.payload = vp;
+
+        Ok(())
+    }
+
+    fn encode(&self) -> Vec<u8> {
+        // encode all except header
+        let mut buf = Vec::<u8>::with_capacity(20);
+        buf.extend_from_slice(&self.header);
+
+        // push fake length
+        let len_bytes = write_len(0);
+        buf.extend_from_slice(&len_bytes);
+
+        // service_path
+        let len = self.service_path.len();
+        let len_bytes = write_len(len as u32);
+        buf.extend_from_slice(&len_bytes);
+        buf.extend_from_slice(self.service_path.as_bytes());
+
+        // service_method
+        let len = self.service_method.len();
+        let len_bytes = write_len(len as u32);
+        buf.extend_from_slice(&len_bytes);
+        buf.extend_from_slice(self.service_method.as_bytes());
+
+        // metadata
+        let mut metadata_bytes = Vec::<u8>::new();
+        let metadata = self.metadata.borrow_mut();
+        for meta in metadata.iter() {
+            let key = meta.0;
+            let len_bytes = write_len(key.len() as u32);
+            metadata_bytes.extend_from_slice(&len_bytes);
+            metadata_bytes.extend_from_slice(key.as_bytes());
+
+            let value = meta.1;
+            let len_bytes = write_len(value.len() as u32);
+            metadata_bytes.extend_from_slice(&len_bytes);
+            metadata_bytes.extend_from_slice(value.as_bytes());
+        }
+        let len = metadata_bytes.len();
+        let len_bytes = write_len(len as u32);
+        buf.extend_from_slice(&len_bytes);
+        buf.append(&mut metadata_bytes);
+
+        // data
+        // check compress
+
+        match self.get_compress_type().unwrap() {
+            CompressType::Gzip => {
+                let mut e = GzEncoder::new(Vec::new(), Compression::fast());
+                let _ = e.write_all(&self.payload[..]);
+                let compressed_payload = e.finish().unwrap();
+                let len = compressed_payload.len();
+                let len_bytes = write_len(len as u32);
+                buf.extend_from_slice(&len_bytes);
+                buf.extend_from_slice(&compressed_payload);
+            }
+            _ => {
+                let len = self.payload.len();
+                let len_bytes = write_len(len as u32);
+                buf.extend_from_slice(&len_bytes);
+                buf.extend_from_slice(&self.payload);
+            }
+        }
+
+        // set the real length
+        let len = buf.len() - 12 - 4;
+        let len_bytes = write_len(len as u32);
+        buf[12] = len_bytes[0];
+        buf[13] = len_bytes[1];
+        buf[14] = len_bytes[2];
+        buf[15] = len_bytes[3];
+
+        buf
+    }
+
+    fn get_error(&self) -> Option<String> {
+        match self.get_message_status_type() {
+            Some(MessageStatusType::Error) => {
+                let metadata = &self.metadata;
+                let metadata2 = metadata.borrow();
+                let err_msg = metadata2.get(&SERVICE_ERROR.to_owned())?;
+                Some(String::from(err_msg))
+            }
+            _ => None,
+        }
+    }
+}
+
+fn read_len(buf: &[u8]) -> u32 {
+    BigEndian::read_u32(&buf[..4])
+}
+
+fn write_len(len: u32) -> [u8; 4] {
+    let mut buf = [0u8; 4];
+    BigEndian::write_u32(&mut buf, len);
+    buf
+}
+
+fn read_str(buf: &[u8]) -> Result<String> {
+    let s = std::str::from_utf8(&buf).unwrap();
+    let str: String = std::string::String::from(s);
+    Ok(str)
+}
+
+fn u64_from_slice(b: &[u8]) -> u64 {
+    BigEndian::read_u64(b)
+}
+
+fn u64_to_slice(v: u64, b: &mut [u8]) {
+    BigEndian::write_u64(b, v);
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn parse_header() {
+        let msg_data: Vec<u8> = vec![
+            8, 0, 0, 16, 0, 0, 0, 0, 73, 150, 2, 210, 0, 0, 0, 98, 0, 0, 0, 5, 
65, 114, 105, 116,
+            104, 0, 0, 0, 3, 65, 100, 100, 0, 0, 0, 48, 0, 0, 0, 4, 95, 95, 
73, 68, 0, 0, 0, 36,
+            54, 98, 97, 55, 98, 56, 49, 48, 45, 57, 100, 97, 100, 45, 49, 49, 
100, 49, 45, 56, 48,
+            98, 52, 45, 48, 48, 99, 48, 52, 102, 100, 52, 51, 48, 99, 57, 0, 
0, 0, 26, 123, 10, 9,
+            9, 34, 65, 34, 58, 32, 49, 44, 10, 9, 9, 34, 66, 34, 58, 32, 50, 
44, 10, 9, 125, 10, 9,
+        ];
+
+        let mut msg = Message::new();
+        (&mut msg.header).copy_from_slice(&msg_data[..12]);
+
+        assert_eq!(true, msg.check_magic_number());
+        assert_eq!(0, msg.get_version());
+        assert_eq!(MessageType::Request, msg.get_message_type().unwrap());
+        assert_eq!(false, msg.is_heartbeat());
+        assert_eq!(false, msg.is_oneway());
+        assert_eq!(CompressType::CompressNone, 
msg.get_compress_type().unwrap());
+        assert_eq!(
+            MessageStatusType::Normal,
+            msg.get_message_status_type().unwrap()
+        );
+        assert_eq!(SerializeType::JSON, msg.get_serialize_type().unwrap());
+        assert_eq!(1234567890, msg.get_seq());
+    }
+
+    #[test]
+    fn set_header() {
+        let msg_data: Vec<u8> = vec![
+            8, 0, 0, 16, 0, 0, 0, 0, 73, 150, 2, 210, 0, 0, 0, 98, 0, 0, 0, 5, 
65, 114, 105, 116,
+            104, 0, 0, 0, 3, 65, 100, 100, 0, 0, 0, 48, 0, 0, 0, 4, 95, 95, 
73, 68, 0, 0, 0, 36,
+            54, 98, 97, 55, 98, 56, 49, 48, 45, 57, 100, 97, 100, 45, 49, 49, 
100, 49, 45, 56, 48,
+            98, 52, 45, 48, 48, 99, 48, 52, 102, 100, 52, 51, 48, 99, 57, 0, 
0, 0, 26, 123, 10, 9,
+            9, 34, 65, 34, 58, 32, 49, 44, 10, 9, 9, 34, 66, 34, 58, 32, 50, 
44, 10, 9, 125, 10, 9,
+        ];
+
+        let mut msg = Message::new();
+        msg.header.copy_from_slice(&msg_data[..12]);
+
+        msg.set_version(0);
+        msg.set_message_type(MessageType::Response);
+        msg.set_heartbeat(true);
+        msg.set_oneway(true);
+        msg.set_compress_type(CompressType::Gzip);
+        msg.set_serialize_type(SerializeType::MsgPack);
+        msg.set_message_status_type(MessageStatusType::Normal);
+        msg.set_seq(1000000);
+
+        assert_eq!(true, msg.check_magic_number());
+        assert_eq!(0, msg.get_version());
+        assert_eq!(MessageType::Response, msg.get_message_type().unwrap());
+        assert_eq!(true, msg.is_heartbeat());
+        assert_eq!(true, msg.is_oneway());
+        assert_eq!(CompressType::Gzip, msg.get_compress_type().unwrap());
+        assert_eq!(
+            MessageStatusType::Normal,
+            msg.get_message_status_type().unwrap()
+        );
+        assert_eq!(SerializeType::MsgPack, msg.get_serialize_type().unwrap());
+        assert_eq!(1000000, msg.get_seq());
+    }
+
+    #[test]
+    fn decode() {
+        let msg_data: [u8; 114] = [
+            8, 0, 0, 16, 0, 0, 0, 0, 73, 150, 2, 210, 0, 0, 0, 98, 0, 0, 0, 5, 
65, 114, 105, 116,
+            104, 0, 0, 0, 3, 65, 100, 100, 0, 0, 0, 48, 0, 0, 0, 4, 95, 95, 
73, 68, 0, 0, 0, 36,
+            54, 98, 97, 55, 98, 56, 49, 48, 45, 57, 100, 97, 100, 45, 49, 49, 
100, 49, 45, 56, 48,
+            98, 52, 45, 48, 48, 99, 48, 52, 102, 100, 52, 51, 48, 99, 57, 0, 
0, 0, 26, 123, 10, 9,
+            9, 34, 65, 34, 58, 32, 49, 44, 10, 9, 9, 34, 66, 34, 58, 32, 50, 
44, 10, 9, 125, 10, 9,
+        ];
+
+        let mut msg = Message::new();
+
+        let mut data = &msg_data[..] as &[u8];
+        match msg.decode(&mut data) {
+            Err(err) => println!("failed to parse: {}", err),
+            Ok(()) => {}
+        }
+
+        assert_eq!("Arith", msg.service_path);
+        assert_eq!("Add", msg.service_method);
+
+        assert_eq!(
+            "6ba7b810-9dad-11d1-80b4-00c04fd430c9",
+            msg.metadata.borrow().get("__ID").unwrap()
+        );
+
+        assert_eq!(
+            "{\n\t\t\"A\": 1,\n\t\t\"B\": 2,\n\t}\n\t",
+            std::str::from_utf8(&msg.payload).unwrap()
+        );
+    }
+
+    #[test]
+    fn encode() {
+        let msg_data: [u8; 114] = [
+            8, 0, 0, 16, 0, 0, 0, 0, 73, 150, 2, 210, 0, 0, 0, 98, 0, 0, 0, 5, 
65, 114, 105, 116,
+            104, 0, 0, 0, 3, 65, 100, 100, 0, 0, 0, 48, 0, 0, 0, 4, 95, 95, 
73, 68, 0, 0, 0, 36,
+            54, 98, 97, 55, 98, 56, 49, 48, 45, 57, 100, 97, 100, 45, 49, 49, 
100, 49, 45, 56, 48,
+            98, 52, 45, 48, 48, 99, 48, 52, 102, 100, 52, 51, 48, 99, 57, 0, 
0, 0, 26, 123, 10, 9,
+            9, 34, 65, 34, 58, 32, 49, 44, 10, 9, 9, 34, 66, 34, 58, 32, 50, 
44, 10, 9, 125, 10, 9,
+        ];
+
+        let mut msg = Message::new();
+
+        let mut data = &msg_data[..] as &[u8];
+        match msg.decode(&mut data) {
+            Err(err) => println!("failed to parse: {}", err),
+            Ok(()) => {}
+        }
+
+        let encoded_bytes = msg.encode();
+
+        assert_eq!(&msg_data[..], &encoded_bytes[..]);
+    }
+}
diff --git a/xds/src/protocol/mod.rs b/xds/src/protocol/mod.rs
new file mode 100644
index 0000000..1d650a8
--- /dev/null
+++ b/xds/src/protocol/mod.rs
@@ -0,0 +1,6 @@
+pub mod error;
+pub mod message;
+
+
+pub use message::*;
+pub use error::*;
\ No newline at end of file
diff --git a/xds/src/server/mod.rs b/xds/src/server/mod.rs
index 2944f98..f7d8248 100644
--- a/xds/src/server/mod.rs
+++ b/xds/src/server/mod.rs
@@ -14,3 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+pub(crate) mod server;
+pub use self::server::*;
+
diff --git a/xds/src/server/server.rs b/xds/src/server/server.rs
index 2944f98..be343fb 100644
--- a/xds/src/server/server.rs
+++ b/xds/src/server/server.rs
@@ -14,3 +14,79 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+use std::convert::Infallible;
+use std::env;
+use std::process::Output;
+use std::task::Poll;
+use hyper::client::service;
+use hyper::{Body, Request, Response, Server as hyper_server};
+use hyper::service::{make_service_fn, service_fn};
+use tower::Service;
+use futures::future::{ ready, Ready, BoxFuture};
+use std::net::SocketAddr;
+use crate::protocol::message::*;
+
+pub struct RpcServer {
+    pub addr: SocketAddr
+}
+
+impl RpcServer {
+    pub fn new(addr: SocketAddr) -> Self {
+        RpcServer {
+            addr
+        }
+    }
+
+    pub async fn start(&self) {
+        env_logger::init();
+        let make_service = make_service_fn(|_conn| async {
+            let svc = RPCHandler;
+            Ok::<_, Infallible>(svc)
+        });
+
+        let server = hyper_server::bind(&self.addr).serve(make_service);
+        println!("Listening on http://{}";, self.addr);
+        if let Err(e) = server.await {
+            eprintln!("server error: {}", e);
+        }
+    }
+}
+
+
+#[derive(Copy, Clone)]
+struct RPCHandler;
+use std::result::Result as StdResult;
+
+impl Service<Request<Body>> for RPCHandler {
+    type Response = Response<Body>;
+    type Error = Infallible;
+    type Future = BoxFuture<'static, StdResult<Self::Response, Self::Error>>;
+
+    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> 
std::task::Poll<StdResult<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, req: Request<Body>) -> Self::Future {
+        Box::pin(async move {
+            let whole_body = 
hyper::body::to_bytes(req.into_body()).await.unwrap();
+
+            let mut msg = Message::new();
+            let mut data = &whole_body[..];
+
+            match msg.decode(&mut data) {
+                Ok(()) => {
+                    let service_path = &msg.service_path;
+                    let service_method = &msg.service_method;
+                    let key = format!("{}.{}", service_path, service_method);
+                    println!("recieved request success, and body message 
decode key is: {:?}", key);
+                }
+                Err(err) => {
+                    eprintln!("failed to read: {}", err.to_string());
+                }
+            }
+            let resp = ready(Ok(Response::new(Body::from("hello world with 
tower service! \n")))).await;
+            resp
+        })
+    }
+}

Reply via email to