This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch poc-idl in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 13d77d437ac523915cb3c9da45850b025536709e Author: johankoi <[email protected]> AuthorDate: Tue Jun 14 01:37:53 2022 +0800 feat: client server message Signed-off-by: johankoi <[email protected]> --- Cargo.toml | 2 + {xds => examples/protobuf/client}/Cargo.toml | 5 +- examples/protobuf/client/src/main.rs | 17 + {xds => examples/protobuf/server}/Cargo.toml | 4 +- examples/protobuf/server/src/main.rs | 11 + xds/Cargo.toml | 19 ++ xds/src/client/client.rs | 59 ++++ xds/src/client/mod.rs | 4 + xds/src/lib.rs | 16 +- xds/src/protocol/error.rs | 189 +++++++++++ xds/src/protocol/message.rs | 468 +++++++++++++++++++++++++++ xds/src/protocol/mod.rs | 6 + xds/src/server/mod.rs | 4 + xds/src/server/server.rs | 76 +++++ 14 files changed, 876 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3720b7d..f73b8c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,4 +6,6 @@ members = [ "metadata", "common", "config", + "examples/protobuf/client", + "examples/protobuf/server", ] \ No newline at end of file diff --git a/xds/Cargo.toml b/examples/protobuf/client/Cargo.toml similarity index 57% copy from xds/Cargo.toml copy to examples/protobuf/client/Cargo.toml index 75e2515..b0bbc6e 100644 --- a/xds/Cargo.toml +++ b/examples/protobuf/client/Cargo.toml @@ -1,8 +1,11 @@ [package] -name = "xds" +name = "protobuf" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +xds = { version = "0.1.0", path = "../../../xds" } +tokio = {version = "1.9.0", features = ["full"]} + diff --git a/examples/protobuf/client/src/main.rs b/examples/protobuf/client/src/main.rs new file mode 100644 index 0000000..0ae5d6a --- /dev/null +++ b/examples/protobuf/client/src/main.rs @@ -0,0 +1,17 @@ +use std::collections::hash_map::HashMap; +use xds::{ client::RpcClient }; + +#[tokio::main] +async fn main() { + let task = tokio::spawn(async move { + let mut client = RpcClient::new(String::from("http://127.0.0.1:8972")); + + println!("client call begin"); + let service_path = String::from("helloworld"); + let service_method = String::from("hello"); + let metadata = HashMap::new(); + client.call(service_path, service_method, &metadata).await; + + }); + task.await.unwrap(); +} \ No newline at end of file diff --git a/xds/Cargo.toml b/examples/protobuf/server/Cargo.toml similarity index 58% copy from xds/Cargo.toml copy to examples/protobuf/server/Cargo.toml index 75e2515..fe077e4 100644 --- a/xds/Cargo.toml +++ b/examples/protobuf/server/Cargo.toml @@ -1,8 +1,10 @@ [package] -name = "xds" +name = "server" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +xds = { version = "0.1.0", path = "../../../xds" } +tokio = {version = "1.9.0", features = ["full"]} \ No newline at end of file diff --git a/examples/protobuf/server/src/main.rs b/examples/protobuf/server/src/main.rs new file mode 100644 index 0000000..a182144 --- /dev/null +++ b/examples/protobuf/server/src/main.rs @@ -0,0 +1,11 @@ +use std::net::SocketAddr; +use xds::{ server::RpcServer }; + + +#[tokio::main] +async fn main() { + let addr = SocketAddr::from(([127, 0, 0, 1], 8972)); + let mut server = RpcServer::new(addr); + server.start().await; + println!("RpcServer ok"); +} \ No newline at end of file diff --git a/xds/Cargo.toml b/xds/Cargo.toml index 75e2515..49af168 100644 --- a/xds/Cargo.toml +++ b/xds/Cargo.toml @@ -6,3 +6,22 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +hyper = { version = "0.14", features = ["full"] } +tokio = {version = "1.9.0", features = ["full"]} +futures = "0.3.16" + +# protocol +byteorder = "1.3.2" +strum = "0.21.0" +strum_macros = "0.21.1" +num-traits = "0.2.8" +enum-primitive-derive = "0.1.2" +serde = { version = "1.0.126",features = ["derive"]} +serde_json = "1.0.40" +bytes = "1.0.1" +flate2 = "1.0" + +# server +tower = "0.4" +log = "0.4" +env_logger = "0.9.0" diff --git a/xds/src/client/client.rs b/xds/src/client/client.rs index 2944f98..4cad8f3 100644 --- a/xds/src/client/client.rs +++ b/xds/src/client/client.rs @@ -14,3 +14,62 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +use hyper::client::conn::Builder; +use hyper::client::connect::HttpConnector; +use hyper::client::service::Connect; +use hyper::service::Service; +use hyper::{Body, Request}; + +use crate::protocol::message::*; +use std::collections::HashMap; + + +pub struct RpcClient { + addr: String +} + +impl RpcClient { + pub fn new(addr: String) -> RpcClient { + RpcClient { + addr + } + } + + pub async fn call( + &mut self, + service_path: String, + service_method: String, + metadata: &Metadata + ) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + { + let mut req = Message::new(); + req.set_version(0); + req.set_message_type(MessageType::Request); + req.set_serialize_type(SerializeType::Protobuf); + req.set_compress_type(CompressType::Gzip); + req.service_path = service_path; + req.service_method = service_method; + + let mut new_metadata = HashMap::with_capacity(metadata.len()); + for (k, v) in metadata { + new_metadata.insert(k.clone(), v.clone()); + } + req.metadata.replace(new_metadata); + let body_data = req.encode(); + + // println!("call, body_data={:?}", body_data); + + let mut mk_svc = Connect::new(HttpConnector::new(), Builder::new()); + let uri = self.addr.parse::<hyper::Uri>().unwrap(); + let mut svc = mk_svc.call(uri.clone()).await?; + + let body = Body::from(body_data); + let req = Request::get(uri.clone()).body(body)?; + let res = svc.call(req).await?; + println!("RESPONSE={:?}", res.body()); + + Ok(()) + } +} + diff --git a/xds/src/client/mod.rs b/xds/src/client/mod.rs index 2944f98..d8d317f 100644 --- a/xds/src/client/mod.rs +++ b/xds/src/client/mod.rs @@ -14,3 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +pub(crate) mod client; +pub use self::client::*; + diff --git a/xds/src/lib.rs b/xds/src/lib.rs index 3e01853..736d111 100644 --- a/xds/src/lib.rs +++ b/xds/src/lib.rs @@ -15,11 +15,23 @@ * limitations under the License. */ +pub mod client; +pub mod server; +mod protocol; + #[cfg(test)] mod tests { + use crate::client::client::RpcClient; + use crate::server::server::RpcServer; + use std::net::SocketAddr; #[test] fn it_works() { - let result = 2 + 2; - assert_eq!(result, 4); + // RpcClient + let client = RpcClient::new(String::from("http://127.0.0.1:8972")); + + // RpcServer + let addr = SocketAddr::from(([127, 0, 0, 1], 8972)); + let server = RpcServer::new(addr); + println!("it_works"); } } diff --git a/xds/src/protocol/error.rs b/xds/src/protocol/error.rs new file mode 100644 index 0000000..c1cdd8f --- /dev/null +++ b/xds/src/protocol/error.rs @@ -0,0 +1,189 @@ +use std::{convert::From, error, fmt, result, str}; + +pub type Result<T> = result::Result<T, Error>; + +pub struct Error { + repr: Repr, +} + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.repr, f) + } +} + +enum Repr { + Other(String), + Simple(ErrorKind), + Custom(Box<Custom>), +} + +#[derive(Debug)] +struct Custom { + kind: ErrorKind, + error: Box<dyn error::Error + Send + Sync>, +} + +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum ErrorKind { + Protocol, + IO, + Client, + Network, + Server, + Serialization, + Other, +} + +impl ErrorKind { + pub(crate) fn as_str(self) -> &'static str { + match self { + ErrorKind::Protocol => "invalid protocol", + ErrorKind::IO => "io issue", + ErrorKind::Client => "client error", + ErrorKind::Network => "network issue", + ErrorKind::Server => "server error", + ErrorKind::Serialization => "serialization failure", + ErrorKind::Other => "other", + } + } +} + +impl From<&'static str> for Error { + #[inline] + fn from(s: &'static str) -> Error { + Error { + repr: Repr::Other(s.to_owned()), + } + } +} + +impl From<String> for Error { + #[inline] + fn from(s: String) -> Error { + Error { + repr: Repr::Other(s), + } + } +} + +impl From<serde_json::error::Error> for Error { + #[inline] + fn from(err: serde_json::error::Error) -> Error { + Error::new(ErrorKind::IO, err) + } +} + +impl From<std::io::Error> for Error { + #[inline] + fn from(err: std::io::Error) -> Error { + Error::new(ErrorKind::IO, err) + } +} + +impl From<Box<dyn std::error::Error + Send + Sync>> for Error { + #[inline] + fn from(err: Box<dyn std::error::Error + Send + Sync>) -> Error { + Error::_new(ErrorKind::Other, err) + } +} +impl From<ErrorKind> for Error { + #[inline] + fn from(kind: ErrorKind) -> Error { + Error { + repr: Repr::Simple(kind), + } + } +} + +impl Error { + pub fn new<E>(kind: ErrorKind, error: E) -> Error + where + E: Into<Box<dyn error::Error + Send + Sync>>, + { + Self::_new(kind, error.into()) + } + + fn _new(kind: ErrorKind, error: Box<dyn error::Error + Send + Sync>) -> Error { + Error { + repr: Repr::Custom(Box::new(Custom { kind, error })), + } + } + + pub fn get_ref(&self) -> Option<&(dyn error::Error + Send + Sync + 'static)> { + match self.repr { + Repr::Other(..) => None, + Repr::Simple(..) => None, + Repr::Custom(ref c) => Some(&*c.error), + } + } + + pub fn get_mut(&mut self) -> Option<&mut (dyn error::Error + Send + Sync + 'static)> { + match self.repr { + Repr::Other(..) => None, + Repr::Simple(..) => None, + Repr::Custom(ref mut c) => Some(&mut *c.error), + } + } + + pub fn into_inner(self) -> Option<Box<dyn error::Error + Send + Sync>> { + match self.repr { + Repr::Other(..) => None, + Repr::Simple(..) => None, + Repr::Custom(c) => Some(c.error), + } + } + + pub fn kind(&self) -> ErrorKind { + match self.repr { + Repr::Other(_) => ErrorKind::Other, + Repr::Custom(ref c) => c.kind, + Repr::Simple(kind) => kind, + } + } +} + +impl fmt::Debug for Repr { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match &*self { + Repr::Other(s) => write!(fmt, "{}", s), + Repr::Custom(ref c) => fmt::Debug::fmt(&c, fmt), + Repr::Simple(kind) => fmt.debug_tuple("Kind").field(&kind).finish(), + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.repr { + Repr::Other(s) => write!(fmt, "{}", s), + Repr::Custom(ref c) => c.error.fmt(fmt), + Repr::Simple(kind) => write!(fmt, "{}", kind.as_str()), + } + } +} + +impl error::Error for Error { + fn description(&self) -> &str { + match &self.repr { + Repr::Other(s) => &s[..], + Repr::Simple(..) => self.kind().as_str(), + Repr::Custom(ref c) => c.error.description(), + } + } + + fn cause(&self) -> Option<&dyn error::Error> { + match self.repr { + Repr::Other(..) => None, + Repr::Simple(..) => None, + Repr::Custom(ref c) => c.error.source(), + } + } + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self.repr { + Repr::Other(..) => None, + Repr::Simple(..) => None, + Repr::Custom(ref c) => c.error.source(), + } + } +} diff --git a/xds/src/protocol/message.rs b/xds/src/protocol/message.rs new file mode 100644 index 0000000..1895172 --- /dev/null +++ b/xds/src/protocol/message.rs @@ -0,0 +1,468 @@ +use byteorder::{BigEndian, ByteOrder}; +use enum_primitive_derive::Primitive; +use flate2::{read::GzDecoder, write::GzEncoder, Compression}; +use num_traits::{FromPrimitive, ToPrimitive}; +use strum_macros::{Display, EnumIter, EnumString}; + +use std::{ + cell::RefCell, + collections::hash_map::HashMap, + io::{Read, Write}, +}; + +use super::error::*; + + +const MAGIC_NUMBER: u8 = 0x08; +pub const SERVICE_ERROR: &str = "__rpcx_error__"; + +#[derive(Debug, Copy, Clone, Display, PartialEq, EnumIter, EnumString, Primitive)] +pub enum MessageType { + Request = 0, + Response = 1, +} + +#[derive(Debug, Copy, Clone, Display, PartialEq, EnumIter, EnumString, Primitive)] +pub enum MessageStatusType { + Normal = 0, + Error = 1, +} + +#[derive(Debug, Copy, Clone, Display, PartialEq, EnumIter, EnumString, Primitive)] +pub enum CompressType { + CompressNone = 0, + Gzip = 1, +} + +#[derive(Debug, Copy, Clone, Display, PartialEq, EnumIter, EnumString, Primitive)] +pub enum SerializeType { + SerializeNone = 0, + JSON = 1, + Protobuf = 2, + MsgPack = 3, + Thrift = 4, +} + +/// define the rpcx message interface. +pub trait RpcxMessage { + fn check_magic_number(&self) -> bool; + fn get_version(&self) -> u8; + fn set_version(&mut self, v: u8); + fn get_message_type(&self) -> Option<MessageType>; + fn set_message_type(&mut self, mt: MessageType); + fn is_heartbeat(&self) -> bool; + fn set_heartbeat(&mut self, b: bool); + fn is_oneway(&self) -> bool; + fn set_oneway(&mut self, b: bool); + fn get_compress_type(&self) -> Option<CompressType>; + fn set_compress_type(&mut self, ct: CompressType); + fn get_message_status_type(&self) -> Option<MessageStatusType>; + fn set_message_status_type(&mut self, mst: MessageStatusType); + fn get_serialize_type(&self) -> Option<SerializeType>; + fn set_serialize_type(&mut self, st: SerializeType); + fn get_seq(&self) -> u64; + fn set_seq(&mut self, seq: u64); + fn decode<R: ?Sized>(&mut self, r: &mut R) -> Result<()> + where + R: Read; + fn encode(&self) -> Vec<u8>; + + fn get_error(&self) -> Option<String>; +} + +pub type Metadata = HashMap<String, String>; + +/// a commmon struct for request and response. +#[derive(Debug, Default)] +pub struct Message { + pub header: [u8; 12], + pub service_path: String, + pub service_method: String, + pub metadata: RefCell<Metadata>, + pub payload: Vec<u8>, +} +impl Message { + /// Creates a new `Message` + pub fn new() -> Self { + let mut msg: Message = Default::default(); + msg.header = [0u8; 12]; + msg.header[0] = MAGIC_NUMBER; + msg.metadata = RefCell::new(HashMap::new()); + msg + } + + pub fn get_reply(&self) -> Result<Self> { + let mut reply = Message::new(); + reply.set_version(self.get_version()); + reply.set_compress_type(self.get_compress_type().unwrap()); + reply.set_message_status_type(MessageStatusType::Normal); + reply.set_message_type(MessageType::Response); + reply.set_serialize_type(self.get_serialize_type().unwrap()); + reply.set_seq(self.get_seq()); + reply.service_path = self.service_path.clone(); + reply.service_method = self.service_method.clone(); + + Ok(reply) + } +} + +impl RpcxMessage for Message { + fn check_magic_number(&self) -> bool { + self.header[0] == MAGIC_NUMBER + } + + fn get_version(&self) -> u8 { + self.header[1] + } + fn set_version(&mut self, v: u8) { + self.header[1] = v; + } + + fn get_message_type(&self) -> Option<MessageType> { + MessageType::from_u8((self.header[2] & 0x80) >> 7 as u8) + } + fn set_message_type(&mut self, mt: MessageType) { + self.header[2] |= mt.to_u8().unwrap() << 7; + } + fn is_heartbeat(&self) -> bool { + self.header[2] & 0x40 == 0x40 + } + fn set_heartbeat(&mut self, b: bool) { + if b { + self.header[2] |= 0x40; + } else { + self.header[2] &= !0x40; + } + } + fn is_oneway(&self) -> bool { + self.header[2] & 0x20 == 0x20 + } + fn set_oneway(&mut self, b: bool) { + if b { + self.header[2] |= 0x20; + } else { + self.header[2] &= !0x20; + } + } + fn get_compress_type(&self) -> Option<CompressType> { + CompressType::from_u8((self.header[2] & 0x1C) >> 2) + } + fn set_compress_type(&mut self, ct: CompressType) { + self.header[2] = (self.header[2] & !0x1C) | (ct.to_u8().unwrap() << 2 & 0x1C); + } + fn get_message_status_type(&self) -> Option<MessageStatusType> { + MessageStatusType::from_u8(self.header[2] & 0x03) + } + fn set_message_status_type(&mut self, mst: MessageStatusType) { + self.header[2] = (self.header[2] & !0x03) | (mst.to_u8().unwrap() & 0x03); + } + fn get_serialize_type(&self) -> Option<SerializeType> { + SerializeType::from_u8((self.header[3] & 0xF0) >> 4) + } + fn set_serialize_type(&mut self, st: SerializeType) { + self.header[3] = (self.header[3] & !0xF0) | (st.to_u8().unwrap() << 4) + } + fn get_seq(&self) -> u64 { + u64_from_slice(&(self.header[4..])) + } + fn set_seq(&mut self, seq: u64) { + u64_to_slice(seq, &mut self.header[4..]); + } + + fn decode<R: ?Sized>(&mut self, r: &mut R) -> Result<()> + where + R: Read, + { + r.read_exact(&mut self.header)?; + + let mut buf = [0u8; 4]; + r.read(&mut buf[..]).map(|_| {})?; + let len = BigEndian::read_u32(&buf); //length of all expect header + let mut buf = vec![0u8; len as usize]; + r.read(&mut buf[..]).map(|_| ())?; + + let mut start = 0; + // read service_path + let len = read_len(&buf[start..(start + 4)]) as usize; + let service_path = read_str(&buf[(start + 4)..(start + 4 + len)])?; + self.service_path = service_path; + start = start + 4 + len; + // read service_method + let len = read_len(&buf[start..(start + 4)]) as usize; + let service_method = read_str(&buf[(start + 4)..(start + 4 + len)])?; + self.service_method = service_method; + + start = start + 4 + len; + //metadata + let len = read_len(&buf[start..(start + 4)]) as usize; + let metadata_bytes = &buf[(start + 4)..(start + 4 + len)]; + let mut meta_start = 0; + while meta_start < len { + let sl = read_len(&metadata_bytes[meta_start..(meta_start + 4)]) as usize; + let key = read_str(&metadata_bytes[(meta_start + 4)..(meta_start + 4 + sl)])?; + meta_start = meta_start + 4 + sl; + if meta_start < len { + let value_len = read_len(&metadata_bytes[meta_start..(meta_start + 4)]) as usize; + let value = + read_str(&metadata_bytes[(meta_start + 4)..(meta_start + 4 + value_len)])?; + self.metadata.borrow_mut().insert(key, value); + meta_start = meta_start + 4 + value_len; + } else { + self.metadata.borrow_mut().insert(key, String::new()); + break; + } + } + start = start + 4 + len; + // payload + let len = read_len(&buf[start..start + 4]) as usize; + let payload = &buf[start + 4..]; + if len != payload.len() { + return Err(Error::from("invalid payload length")); + } + + let mut vp = Vec::with_capacity(payload.len()); + match self.get_compress_type().unwrap() { + CompressType::Gzip => { + let mut deflater = GzDecoder::new(payload); + deflater.read_to_end(&mut vp)?; + } + CompressType::CompressNone => { + vp.extend_from_slice(&payload); + } + } + self.payload = vp; + + Ok(()) + } + + fn encode(&self) -> Vec<u8> { + // encode all except header + let mut buf = Vec::<u8>::with_capacity(20); + buf.extend_from_slice(&self.header); + + // push fake length + let len_bytes = write_len(0); + buf.extend_from_slice(&len_bytes); + + // service_path + let len = self.service_path.len(); + let len_bytes = write_len(len as u32); + buf.extend_from_slice(&len_bytes); + buf.extend_from_slice(self.service_path.as_bytes()); + + // service_method + let len = self.service_method.len(); + let len_bytes = write_len(len as u32); + buf.extend_from_slice(&len_bytes); + buf.extend_from_slice(self.service_method.as_bytes()); + + // metadata + let mut metadata_bytes = Vec::<u8>::new(); + let metadata = self.metadata.borrow_mut(); + for meta in metadata.iter() { + let key = meta.0; + let len_bytes = write_len(key.len() as u32); + metadata_bytes.extend_from_slice(&len_bytes); + metadata_bytes.extend_from_slice(key.as_bytes()); + + let value = meta.1; + let len_bytes = write_len(value.len() as u32); + metadata_bytes.extend_from_slice(&len_bytes); + metadata_bytes.extend_from_slice(value.as_bytes()); + } + let len = metadata_bytes.len(); + let len_bytes = write_len(len as u32); + buf.extend_from_slice(&len_bytes); + buf.append(&mut metadata_bytes); + + // data + // check compress + + match self.get_compress_type().unwrap() { + CompressType::Gzip => { + let mut e = GzEncoder::new(Vec::new(), Compression::fast()); + let _ = e.write_all(&self.payload[..]); + let compressed_payload = e.finish().unwrap(); + let len = compressed_payload.len(); + let len_bytes = write_len(len as u32); + buf.extend_from_slice(&len_bytes); + buf.extend_from_slice(&compressed_payload); + } + _ => { + let len = self.payload.len(); + let len_bytes = write_len(len as u32); + buf.extend_from_slice(&len_bytes); + buf.extend_from_slice(&self.payload); + } + } + + // set the real length + let len = buf.len() - 12 - 4; + let len_bytes = write_len(len as u32); + buf[12] = len_bytes[0]; + buf[13] = len_bytes[1]; + buf[14] = len_bytes[2]; + buf[15] = len_bytes[3]; + + buf + } + + fn get_error(&self) -> Option<String> { + match self.get_message_status_type() { + Some(MessageStatusType::Error) => { + let metadata = &self.metadata; + let metadata2 = metadata.borrow(); + let err_msg = metadata2.get(&SERVICE_ERROR.to_owned())?; + Some(String::from(err_msg)) + } + _ => None, + } + } +} + +fn read_len(buf: &[u8]) -> u32 { + BigEndian::read_u32(&buf[..4]) +} + +fn write_len(len: u32) -> [u8; 4] { + let mut buf = [0u8; 4]; + BigEndian::write_u32(&mut buf, len); + buf +} + +fn read_str(buf: &[u8]) -> Result<String> { + let s = std::str::from_utf8(&buf).unwrap(); + let str: String = std::string::String::from(s); + Ok(str) +} + +fn u64_from_slice(b: &[u8]) -> u64 { + BigEndian::read_u64(b) +} + +fn u64_to_slice(v: u64, b: &mut [u8]) { + BigEndian::write_u64(b, v); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_header() { + let msg_data: Vec<u8> = vec![ + 8, 0, 0, 16, 0, 0, 0, 0, 73, 150, 2, 210, 0, 0, 0, 98, 0, 0, 0, 5, 65, 114, 105, 116, + 104, 0, 0, 0, 3, 65, 100, 100, 0, 0, 0, 48, 0, 0, 0, 4, 95, 95, 73, 68, 0, 0, 0, 36, + 54, 98, 97, 55, 98, 56, 49, 48, 45, 57, 100, 97, 100, 45, 49, 49, 100, 49, 45, 56, 48, + 98, 52, 45, 48, 48, 99, 48, 52, 102, 100, 52, 51, 48, 99, 57, 0, 0, 0, 26, 123, 10, 9, + 9, 34, 65, 34, 58, 32, 49, 44, 10, 9, 9, 34, 66, 34, 58, 32, 50, 44, 10, 9, 125, 10, 9, + ]; + + let mut msg = Message::new(); + (&mut msg.header).copy_from_slice(&msg_data[..12]); + + assert_eq!(true, msg.check_magic_number()); + assert_eq!(0, msg.get_version()); + assert_eq!(MessageType::Request, msg.get_message_type().unwrap()); + assert_eq!(false, msg.is_heartbeat()); + assert_eq!(false, msg.is_oneway()); + assert_eq!(CompressType::CompressNone, msg.get_compress_type().unwrap()); + assert_eq!( + MessageStatusType::Normal, + msg.get_message_status_type().unwrap() + ); + assert_eq!(SerializeType::JSON, msg.get_serialize_type().unwrap()); + assert_eq!(1234567890, msg.get_seq()); + } + + #[test] + fn set_header() { + let msg_data: Vec<u8> = vec![ + 8, 0, 0, 16, 0, 0, 0, 0, 73, 150, 2, 210, 0, 0, 0, 98, 0, 0, 0, 5, 65, 114, 105, 116, + 104, 0, 0, 0, 3, 65, 100, 100, 0, 0, 0, 48, 0, 0, 0, 4, 95, 95, 73, 68, 0, 0, 0, 36, + 54, 98, 97, 55, 98, 56, 49, 48, 45, 57, 100, 97, 100, 45, 49, 49, 100, 49, 45, 56, 48, + 98, 52, 45, 48, 48, 99, 48, 52, 102, 100, 52, 51, 48, 99, 57, 0, 0, 0, 26, 123, 10, 9, + 9, 34, 65, 34, 58, 32, 49, 44, 10, 9, 9, 34, 66, 34, 58, 32, 50, 44, 10, 9, 125, 10, 9, + ]; + + let mut msg = Message::new(); + msg.header.copy_from_slice(&msg_data[..12]); + + msg.set_version(0); + msg.set_message_type(MessageType::Response); + msg.set_heartbeat(true); + msg.set_oneway(true); + msg.set_compress_type(CompressType::Gzip); + msg.set_serialize_type(SerializeType::MsgPack); + msg.set_message_status_type(MessageStatusType::Normal); + msg.set_seq(1000000); + + assert_eq!(true, msg.check_magic_number()); + assert_eq!(0, msg.get_version()); + assert_eq!(MessageType::Response, msg.get_message_type().unwrap()); + assert_eq!(true, msg.is_heartbeat()); + assert_eq!(true, msg.is_oneway()); + assert_eq!(CompressType::Gzip, msg.get_compress_type().unwrap()); + assert_eq!( + MessageStatusType::Normal, + msg.get_message_status_type().unwrap() + ); + assert_eq!(SerializeType::MsgPack, msg.get_serialize_type().unwrap()); + assert_eq!(1000000, msg.get_seq()); + } + + #[test] + fn decode() { + let msg_data: [u8; 114] = [ + 8, 0, 0, 16, 0, 0, 0, 0, 73, 150, 2, 210, 0, 0, 0, 98, 0, 0, 0, 5, 65, 114, 105, 116, + 104, 0, 0, 0, 3, 65, 100, 100, 0, 0, 0, 48, 0, 0, 0, 4, 95, 95, 73, 68, 0, 0, 0, 36, + 54, 98, 97, 55, 98, 56, 49, 48, 45, 57, 100, 97, 100, 45, 49, 49, 100, 49, 45, 56, 48, + 98, 52, 45, 48, 48, 99, 48, 52, 102, 100, 52, 51, 48, 99, 57, 0, 0, 0, 26, 123, 10, 9, + 9, 34, 65, 34, 58, 32, 49, 44, 10, 9, 9, 34, 66, 34, 58, 32, 50, 44, 10, 9, 125, 10, 9, + ]; + + let mut msg = Message::new(); + + let mut data = &msg_data[..] as &[u8]; + match msg.decode(&mut data) { + Err(err) => println!("failed to parse: {}", err), + Ok(()) => {} + } + + assert_eq!("Arith", msg.service_path); + assert_eq!("Add", msg.service_method); + + assert_eq!( + "6ba7b810-9dad-11d1-80b4-00c04fd430c9", + msg.metadata.borrow().get("__ID").unwrap() + ); + + assert_eq!( + "{\n\t\t\"A\": 1,\n\t\t\"B\": 2,\n\t}\n\t", + std::str::from_utf8(&msg.payload).unwrap() + ); + } + + #[test] + fn encode() { + let msg_data: [u8; 114] = [ + 8, 0, 0, 16, 0, 0, 0, 0, 73, 150, 2, 210, 0, 0, 0, 98, 0, 0, 0, 5, 65, 114, 105, 116, + 104, 0, 0, 0, 3, 65, 100, 100, 0, 0, 0, 48, 0, 0, 0, 4, 95, 95, 73, 68, 0, 0, 0, 36, + 54, 98, 97, 55, 98, 56, 49, 48, 45, 57, 100, 97, 100, 45, 49, 49, 100, 49, 45, 56, 48, + 98, 52, 45, 48, 48, 99, 48, 52, 102, 100, 52, 51, 48, 99, 57, 0, 0, 0, 26, 123, 10, 9, + 9, 34, 65, 34, 58, 32, 49, 44, 10, 9, 9, 34, 66, 34, 58, 32, 50, 44, 10, 9, 125, 10, 9, + ]; + + let mut msg = Message::new(); + + let mut data = &msg_data[..] as &[u8]; + match msg.decode(&mut data) { + Err(err) => println!("failed to parse: {}", err), + Ok(()) => {} + } + + let encoded_bytes = msg.encode(); + + assert_eq!(&msg_data[..], &encoded_bytes[..]); + } +} diff --git a/xds/src/protocol/mod.rs b/xds/src/protocol/mod.rs new file mode 100644 index 0000000..1d650a8 --- /dev/null +++ b/xds/src/protocol/mod.rs @@ -0,0 +1,6 @@ +pub mod error; +pub mod message; + + +pub use message::*; +pub use error::*; \ No newline at end of file diff --git a/xds/src/server/mod.rs b/xds/src/server/mod.rs index 2944f98..f7d8248 100644 --- a/xds/src/server/mod.rs +++ b/xds/src/server/mod.rs @@ -14,3 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +pub(crate) mod server; +pub use self::server::*; + diff --git a/xds/src/server/server.rs b/xds/src/server/server.rs index 2944f98..be343fb 100644 --- a/xds/src/server/server.rs +++ b/xds/src/server/server.rs @@ -14,3 +14,79 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +use std::convert::Infallible; +use std::env; +use std::process::Output; +use std::task::Poll; +use hyper::client::service; +use hyper::{Body, Request, Response, Server as hyper_server}; +use hyper::service::{make_service_fn, service_fn}; +use tower::Service; +use futures::future::{ ready, Ready, BoxFuture}; +use std::net::SocketAddr; +use crate::protocol::message::*; + +pub struct RpcServer { + pub addr: SocketAddr +} + +impl RpcServer { + pub fn new(addr: SocketAddr) -> Self { + RpcServer { + addr + } + } + + pub async fn start(&self) { + env_logger::init(); + let make_service = make_service_fn(|_conn| async { + let svc = RPCHandler; + Ok::<_, Infallible>(svc) + }); + + let server = hyper_server::bind(&self.addr).serve(make_service); + println!("Listening on http://{}", self.addr); + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } + } +} + + +#[derive(Copy, Clone)] +struct RPCHandler; +use std::result::Result as StdResult; + +impl Service<Request<Body>> for RPCHandler { + type Response = Response<Body>; + type Error = Infallible; + type Future = BoxFuture<'static, StdResult<Self::Response, Self::Error>>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<StdResult<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request<Body>) -> Self::Future { + Box::pin(async move { + let whole_body = hyper::body::to_bytes(req.into_body()).await.unwrap(); + + let mut msg = Message::new(); + let mut data = &whole_body[..]; + + match msg.decode(&mut data) { + Ok(()) => { + let service_path = &msg.service_path; + let service_method = &msg.service_method; + let key = format!("{}.{}", service_path, service_method); + println!("recieved request success, and body message decode key is: {:?}", key); + } + Err(err) => { + eprintln!("failed to read: {}", err.to_string()); + } + } + let resp = ready(Ok(Response::new(Body::from("hello world with tower service! \n")))).await; + resp + }) + } +}
