http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/lib/rs/src/transport/mem.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/transport/mem.rs b/lib/rs/src/transport/mem.rs index 97ec503..86ac6bb 100644 --- a/lib/rs/src/transport/mem.rs +++ b/lib/rs/src/transport/mem.rs @@ -17,9 +17,11 @@ use std::cmp; use std::io; +use std::sync::{Arc, Mutex}; -/// Simple transport that contains both a fixed-length internal read buffer and -/// a fixed-length internal write buffer. +use super::{ReadHalf, TIoChannel, WriteHalf}; + +/// In-memory read and write channel with fixed-size read and write buffers. /// /// On a `write` bytes are written to the internal write buffer. Writes are no /// longer accepted once this buffer is full. Callers must `empty_write_buffer()` @@ -29,37 +31,61 @@ use std::io; /// `set_readable_bytes(...)`. Callers can then read until the buffer is /// depleted. No further reads are accepted until the internal read buffer is /// replenished again. -pub struct TBufferTransport { - rbuf: Box<[u8]>, - rpos: usize, - ridx: usize, - rcap: usize, - wbuf: Box<[u8]>, - wpos: usize, - wcap: usize, +#[derive(Debug)] +pub struct TBufferChannel { + read: Arc<Mutex<ReadData>>, + write: Arc<Mutex<WriteData>>, +} + +#[derive(Debug)] +struct ReadData { + buf: Box<[u8]>, + pos: usize, + idx: usize, + cap: usize, } -impl TBufferTransport { - /// Constructs a new, empty `TBufferTransport` with the given +#[derive(Debug)] +struct WriteData { + buf: Box<[u8]>, + pos: usize, + cap: usize, +} + +impl TBufferChannel { + /// Constructs a new, empty `TBufferChannel` with the given /// read buffer capacity and write buffer capacity. - pub fn with_capacity(read_buffer_capacity: usize, - write_buffer_capacity: usize) - -> TBufferTransport { - TBufferTransport { - rbuf: vec![0; read_buffer_capacity].into_boxed_slice(), - ridx: 0, - rpos: 0, - rcap: read_buffer_capacity, - wbuf: vec![0; write_buffer_capacity].into_boxed_slice(), - wpos: 0, - wcap: write_buffer_capacity, + pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel { + TBufferChannel { + read: Arc::new( + Mutex::new( + ReadData { + buf: vec![0; read_capacity].into_boxed_slice(), + idx: 0, + pos: 0, + cap: read_capacity, + }, + ), + ), + write: Arc::new( + Mutex::new( + WriteData { + buf: vec![0; write_capacity].into_boxed_slice(), + pos: 0, + cap: write_capacity, + }, + ), + ), } } - /// Return a slice containing the bytes held by the internal read buffer. - /// Returns an empty slice if no readable bytes are present. - pub fn read_buffer(&self) -> &[u8] { - &self.rbuf[..self.ridx] + /// Return a copy of the bytes held by the internal read buffer. + /// Returns an empty vector if no readable bytes are present. + pub fn read_bytes(&self) -> Vec<u8> { + let rdata = self.read.as_ref().lock().unwrap(); + let mut buf = vec![0u8; rdata.idx]; + buf.copy_from_slice(&rdata.buf[..rdata.idx]); + buf } // FIXME: do I really need this API call? @@ -68,8 +94,9 @@ impl TBufferTransport { /// /// Subsequent calls to `read` will return nothing. pub fn empty_read_buffer(&mut self) { - self.rpos = 0; - self.ridx = 0; + let mut rdata = self.read.as_ref().lock().unwrap(); + rdata.pos = 0; + rdata.idx = 0; } /// Copy bytes from the source buffer `buf` into the internal read buffer, @@ -77,37 +104,36 @@ impl TBufferTransport { /// which is `min(buf.len(), internal_read_buf.len())`. pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize { self.empty_read_buffer(); - let max_bytes = cmp::min(self.rcap, buf.len()); - self.rbuf[..max_bytes].clone_from_slice(&buf[..max_bytes]); - self.ridx = max_bytes; + let mut rdata = self.read.as_ref().lock().unwrap(); + let max_bytes = cmp::min(rdata.cap, buf.len()); + rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]); + rdata.idx = max_bytes; max_bytes } - /// Return a slice containing the bytes held by the internal write buffer. - /// Returns an empty slice if no bytes were written. - pub fn write_buffer_as_ref(&self) -> &[u8] { - &self.wbuf[..self.wpos] - } - - /// Return a vector with a copy of the bytes held by the internal write buffer. + /// Return a copy of the bytes held by the internal write buffer. /// Returns an empty vector if no bytes were written. - pub fn write_buffer_to_vec(&self) -> Vec<u8> { - let mut buf = vec![0u8; self.wpos]; - buf.copy_from_slice(&self.wbuf[..self.wpos]); + pub fn write_bytes(&self) -> Vec<u8> { + let wdata = self.write.as_ref().lock().unwrap(); + let mut buf = vec![0u8; wdata.pos]; + buf.copy_from_slice(&wdata.buf[..wdata.pos]); buf } /// Resets the internal write buffer, making it seem like no bytes were - /// written. Calling `write_buffer` after this returns an empty slice. + /// written. Calling `write_buffer` after this returns an empty vector. pub fn empty_write_buffer(&mut self) { - self.wpos = 0; + let mut wdata = self.write.as_ref().lock().unwrap(); + wdata.pos = 0; } /// Overwrites the contents of the read buffer with the contents of the /// write buffer. The write buffer is emptied after this operation. pub fn copy_write_buffer_to_read_buffer(&mut self) { + // FIXME: redo this entire method let buf = { - let b = self.write_buffer_as_ref(); + let wdata = self.write.as_ref().lock().unwrap(); + let b = &wdata.buf[..wdata.pos]; let mut b_ret = vec![0; b.len()]; b_ret.copy_from_slice(b); b_ret @@ -120,20 +146,45 @@ impl TBufferTransport { } } -impl io::Read for TBufferTransport { +impl TIoChannel for TBufferChannel { + fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)> + where + Self: Sized, + { + Ok( + (ReadHalf { + handle: TBufferChannel { + read: self.read.clone(), + write: self.write.clone(), + }, + }, + WriteHalf { + handle: TBufferChannel { + read: self.read.clone(), + write: self.write.clone(), + }, + }), + ) + } +} + +impl io::Read for TBufferChannel { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - let nread = cmp::min(buf.len(), self.ridx - self.rpos); - buf[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]); - self.rpos += nread; + let mut rdata = self.read.as_ref().lock().unwrap(); + let nread = cmp::min(buf.len(), rdata.idx - rdata.pos); + buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]); + rdata.pos += nread; Ok(nread) } } -impl io::Write for TBufferTransport { +impl io::Write for TBufferChannel { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - let nwrite = cmp::min(buf.len(), self.wcap - self.wpos); - self.wbuf[self.wpos..self.wpos + nwrite].clone_from_slice(&buf[..nwrite]); - self.wpos += nwrite; + let mut wdata = self.write.as_ref().lock().unwrap(); + let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos); + let (start, end) = (wdata.pos, wdata.pos + nwrite); + wdata.buf[start..end].clone_from_slice(&buf[..nwrite]); + wdata.pos += nwrite; Ok(nwrite) } @@ -146,68 +197,68 @@ impl io::Write for TBufferTransport { mod tests { use std::io::{Read, Write}; - use super::TBufferTransport; + use super::TBufferChannel; #[test] fn must_empty_write_buffer() { - let mut t = TBufferTransport::with_capacity(0, 1); + let mut t = TBufferChannel::with_capacity(0, 1); let bytes_to_write: [u8; 1] = [0x01]; let result = t.write(&bytes_to_write); assert_eq!(result.unwrap(), 1); - assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write); + assert_eq!(&t.write_bytes(), &bytes_to_write); t.empty_write_buffer(); - assert_eq!(t.write_buffer_as_ref().len(), 0); + assert_eq!(t.write_bytes().len(), 0); } #[test] fn must_accept_writes_after_buffer_emptied() { - let mut t = TBufferTransport::with_capacity(0, 2); + let mut t = TBufferChannel::with_capacity(0, 2); let bytes_to_write: [u8; 2] = [0x01, 0x02]; // first write (all bytes written) let result = t.write(&bytes_to_write); assert_eq!(result.unwrap(), 2); - assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write); + assert_eq!(&t.write_bytes(), &bytes_to_write); // try write again (nothing should be written) let result = t.write(&bytes_to_write); assert_eq!(result.unwrap(), 0); - assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write); // still the same as before + assert_eq!(&t.write_bytes(), &bytes_to_write); // still the same as before // now reset the buffer t.empty_write_buffer(); - assert_eq!(t.write_buffer_as_ref().len(), 0); + assert_eq!(t.write_bytes().len(), 0); // now try write again - the write should succeed let result = t.write(&bytes_to_write); assert_eq!(result.unwrap(), 2); - assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write); + assert_eq!(&t.write_bytes(), &bytes_to_write); } #[test] fn must_accept_multiple_writes_until_buffer_is_full() { - let mut t = TBufferTransport::with_capacity(0, 10); + let mut t = TBufferChannel::with_capacity(0, 10); // first write (all bytes written) let bytes_to_write_0: [u8; 2] = [0x01, 0x41]; let write_0_result = t.write(&bytes_to_write_0); assert_eq!(write_0_result.unwrap(), 2); - assert_eq!(t.write_buffer_as_ref(), &bytes_to_write_0); + assert_eq!(t.write_bytes(), &bytes_to_write_0); // second write (all bytes written, starting at index 2) let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF]; let write_1_result = t.write(&bytes_to_write_1); assert_eq!(write_1_result.unwrap(), 7); - assert_eq!(&t.write_buffer_as_ref()[2..], &bytes_to_write_1); + assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1); // third write (only 1 byte written - that's all we have space for) let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98]; let write_2_result = t.write(&bytes_to_write_2); assert_eq!(write_2_result.unwrap(), 1); - assert_eq!(&t.write_buffer_as_ref()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?! + assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?! // fourth write (no writes are accepted) let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD]; @@ -219,50 +270,50 @@ mod tests { expected.extend_from_slice(&bytes_to_write_0); expected.extend_from_slice(&bytes_to_write_1); expected.extend_from_slice(&bytes_to_write_2[0..1]); - assert_eq!(t.write_buffer_as_ref(), &expected[..]); + assert_eq!(t.write_bytes(), &expected[..]); } #[test] fn must_empty_read_buffer() { - let mut t = TBufferTransport::with_capacity(1, 0); + let mut t = TBufferChannel::with_capacity(1, 0); let bytes_to_read: [u8; 1] = [0x01]; let result = t.set_readable_bytes(&bytes_to_read); assert_eq!(result, 1); - assert_eq!(&t.read_buffer(), &bytes_to_read); + assert_eq!(t.read_bytes(), &bytes_to_read); t.empty_read_buffer(); - assert_eq!(t.read_buffer().len(), 0); + assert_eq!(t.read_bytes().len(), 0); } #[test] fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() { - let mut t = TBufferTransport::with_capacity(1, 0); + let mut t = TBufferChannel::with_capacity(1, 0); let bytes_to_read_0: [u8; 1] = [0x01]; let result = t.set_readable_bytes(&bytes_to_read_0); assert_eq!(result, 1); - assert_eq!(&t.read_buffer(), &bytes_to_read_0); + assert_eq!(t.read_bytes(), &bytes_to_read_0); t.empty_read_buffer(); - assert_eq!(t.read_buffer().len(), 0); + assert_eq!(t.read_bytes().len(), 0); let bytes_to_read_1: [u8; 1] = [0x02]; let result = t.set_readable_bytes(&bytes_to_read_1); assert_eq!(result, 1); - assert_eq!(&t.read_buffer(), &bytes_to_read_1); + assert_eq!(t.read_bytes(), &bytes_to_read_1); } #[test] fn must_accept_multiple_reads_until_all_bytes_read() { - let mut t = TBufferTransport::with_capacity(10, 0); + let mut t = TBufferChannel::with_capacity(10, 0); let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D]; // check that we're able to set the bytes to be read let result = t.set_readable_bytes(&readable_bytes); assert_eq!(result, 10); - assert_eq!(&t.read_buffer(), &readable_bytes); + assert_eq!(t.read_bytes(), &readable_bytes); // first read let mut read_buf_0 = vec![0; 5]; @@ -300,21 +351,21 @@ mod tests { #[test] fn must_allow_reads_to_succeed_after_read_buffer_replenished() { - let mut t = TBufferTransport::with_capacity(3, 0); + let mut t = TBufferChannel::with_capacity(3, 0); let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33]; // check that we're able to set the bytes to be read let result = t.set_readable_bytes(&readable_bytes_0); assert_eq!(result, 3); - assert_eq!(&t.read_buffer(), &readable_bytes_0); + assert_eq!(t.read_bytes(), &readable_bytes_0); let mut read_buf = vec![0; 4]; // drain the read buffer let read_result = t.read(&mut read_buf); assert_eq!(read_result.unwrap(), 3); - assert_eq!(t.read_buffer(), &read_buf[0..3]); + assert_eq!(t.read_bytes(), &read_buf[0..3]); // check that a subsequent read fails let read_result = t.read(&mut read_buf); @@ -332,11 +383,11 @@ mod tests { // check that we're able to set the bytes to be read let result = t.set_readable_bytes(&readable_bytes_1); assert_eq!(result, 2); - assert_eq!(&t.read_buffer(), &readable_bytes_1); + assert_eq!(t.read_bytes(), &readable_bytes_1); // read again let read_result = t.read(&mut read_buf); assert_eq!(read_result.unwrap(), 2); - assert_eq!(t.read_buffer(), &read_buf[0..2]); + assert_eq!(t.read_bytes(), &read_buf[0..2]); } }
http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/lib/rs/src/transport/mod.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/transport/mod.rs b/lib/rs/src/transport/mod.rs index 1c39f50..9392786 100644 --- a/lib/rs/src/transport/mod.rs +++ b/lib/rs/src/transport/mod.rs @@ -15,37 +15,266 @@ // specific language governing permissions and limitations // under the License. -//! Types required to send and receive bytes over an I/O channel. +//! Types used to send and receive bytes over an I/O channel. //! -//! The core type is the `TTransport` trait, through which a `TProtocol` can -//! send and receive primitives over the wire. While `TProtocol` instances deal -//! with primitive types, `TTransport` instances understand only bytes. +//! The core types are the `TReadTransport`, `TWriteTransport` and the +//! `TIoChannel` traits, through which `TInputProtocol` or +//! `TOutputProtocol` can receive and send primitives over the wire. While +//! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives +//! the types in this module understand only bytes. -use std::cell::RefCell; use std::io; -use std::rc::Rc; +use std::io::{Read, Write}; +use std::ops::{Deref, DerefMut}; + +#[cfg(test)] +macro_rules! assert_eq_transport_num_written_bytes { + ($transport:ident, $num_written_bytes:expr) => { + { + assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes); + } + }; +} + + +#[cfg(test)] +macro_rules! assert_eq_transport_written_bytes { + ($transport:ident, $expected_bytes:ident) => { + { + assert_eq!($transport.channel.write_bytes(), &$expected_bytes); + } + }; +} mod buffered; mod framed; -mod passthru; mod socket; +mod mem; + +pub use self::buffered::{TBufferedReadTransport, TBufferedReadTransportFactory, + TBufferedWriteTransport, TBufferedWriteTransportFactory}; +pub use self::framed::{TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport, + TFramedWriteTransportFactory}; +pub use self::mem::TBufferChannel; +pub use self::socket::TTcpChannel; + +/// Identifies a transport used by a `TInputProtocol` to receive bytes. +pub trait TReadTransport: Read {} + +/// Helper type used by a server to create `TReadTransport` instances for +/// accepted client connections. +pub trait TReadTransportFactory { + /// Create a `TTransport` that wraps a channel over which bytes are to be read. + fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send>; +} + +/// Identifies a transport used by `TOutputProtocol` to send bytes. +pub trait TWriteTransport: Write {} + +/// Helper type used by a server to create `TWriteTransport` instances for +/// accepted client connections. +pub trait TWriteTransportFactory { + /// Create a `TTransport` that wraps a channel over which bytes are to be sent. + fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send>; +} + +impl<T> TReadTransport for T +where + T: Read, +{ +} + +impl<T> TWriteTransport for T +where + T: Write, +{ +} + +// FIXME: implement the Debug trait for boxed transports + +impl<T> TReadTransportFactory for Box<T> +where + T: TReadTransportFactory + ?Sized, +{ + fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> { + (**self).create(channel) + } +} + +impl<T> TWriteTransportFactory for Box<T> +where + T: TWriteTransportFactory + ?Sized, +{ + fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> { + (**self).create(channel) + } +} + +/// Identifies a splittable bidirectional I/O channel used to send and receive bytes. +pub trait TIoChannel: Read + Write { + /// Split the channel into a readable half and a writable half, where the + /// readable half implements `io::Read` and the writable half implements + /// `io::Write`. Returns `None` if the channel was not initialized, or if it + /// cannot be split safely. + /// + /// Returned halves may share the underlying OS channel or buffer resources. + /// Implementations **should ensure** that these two halves can be safely + /// used independently by concurrent threads. + fn split(self) -> ::Result<(::transport::ReadHalf<Self>, ::transport::WriteHalf<Self>)> + where + Self: Sized; +} + +/// The readable half of an object returned from `TIoChannel::split`. +#[derive(Debug)] +pub struct ReadHalf<C> +where + C: Read, +{ + handle: C, +} + +/// The writable half of an object returned from `TIoChannel::split`. +#[derive(Debug)] +pub struct WriteHalf<C> +where + C: Write, +{ + handle: C, +} + +impl<C> Read for ReadHalf<C> +where + C: Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.handle.read(buf) + } +} + +impl<C> Write for WriteHalf<C> +where + C: Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.handle.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.handle.flush() + } +} + +impl<C> Deref for ReadHalf<C> +where + C: Read, +{ + type Target = C; + + fn deref(&self) -> &Self::Target { + &self.handle + } +} + +impl<C> DerefMut for ReadHalf<C> +where + C: Read, +{ + fn deref_mut(&mut self) -> &mut C { + &mut self.handle + } +} + +impl<C> Deref for WriteHalf<C> +where + C: Write, +{ + type Target = C; + + fn deref(&self) -> &Self::Target { + &self.handle + } +} + +impl<C> DerefMut for WriteHalf<C> +where + C: Write, +{ + fn deref_mut(&mut self) -> &mut C { + &mut self.handle + } +} + +#[cfg(test)] +mod tests { + + use std::io::Cursor; + + use super::*; + + #[test] + fn must_create_usable_read_channel_from_concrete_read_type() { + let r = Cursor::new([0, 1, 2]); + let _ = TBufferedReadTransport::new(r); + } + + #[test] + fn must_create_usable_read_channel_from_boxed_read() { + let r: Box<Read> = Box::new(Cursor::new([0, 1, 2])); + let _ = TBufferedReadTransport::new(r); + } + + #[test] + fn must_create_usable_write_channel_from_concrete_write_type() { + let w = vec![0u8; 10]; + let _ = TBufferedWriteTransport::new(w); + } + + #[test] + fn must_create_usable_write_channel_from_boxed_write() { + let w: Box<Write> = Box::new(vec![0u8; 10]); + let _ = TBufferedWriteTransport::new(w); + } + + #[test] + fn must_create_usable_read_transport_from_concrete_read_transport() { + let r = Cursor::new([0, 1, 2]); + let mut t = TBufferedReadTransport::new(r); + takes_read_transport(&mut t) + } + + #[test] + fn must_create_usable_read_transport_from_boxed_read() { + let r = Cursor::new([0, 1, 2]); + let mut t: Box<TReadTransport> = Box::new(TBufferedReadTransport::new(r)); + takes_read_transport(&mut t) + } -pub mod mem; + #[test] + fn must_create_usable_write_transport_from_concrete_write_transport() { + let w = vec![0u8; 10]; + let mut t = TBufferedWriteTransport::new(w); + takes_write_transport(&mut t) + } -pub use self::mem::TBufferTransport; -pub use self::buffered::{TBufferedTransport, TBufferedTransportFactory}; -pub use self::framed::{TFramedTransport, TFramedTransportFactory}; -pub use self::passthru::TPassThruTransport; -pub use self::socket::TTcpTransport; + #[test] + fn must_create_usable_write_transport_from_boxed_write() { + let w = vec![0u8; 10]; + let mut t: Box<TWriteTransport> = Box::new(TBufferedWriteTransport::new(w)); + takes_write_transport(&mut t) + } -/// Identifies an I/O channel that can be used to send and receive bytes. -pub trait TTransport: io::Read + io::Write {} -impl<I: io::Read + io::Write> TTransport for I {} + fn takes_read_transport<R>(t: &mut R) + where + R: TReadTransport, + { + t.bytes(); + } -/// Helper type used by servers to create `TTransport` instances for accepted -/// client connections. -pub trait TTransportFactory { - /// Create a `TTransport` that wraps an `inner` transport, thus creating - /// a transport stack. - fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport>; + fn takes_write_transport<W>(t: &mut W) + where + W: TWriteTransport, + { + t.flush().unwrap(); + } } http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/lib/rs/src/transport/passthru.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/transport/passthru.rs b/lib/rs/src/transport/passthru.rs deleted file mode 100644 index 60dc3a6..0000000 --- a/lib/rs/src/transport/passthru.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::cell::RefCell; -use std::rc::Rc; -use std::io; -use std::io::{Read, Write}; - -use super::TTransport; - -/// Proxy that wraps an inner `TTransport` and delegates all calls to it. -/// -/// Unlike other `TTransport` wrappers, `TPassThruTransport` is generic with -/// regards to the wrapped transport. This allows callers to use methods -/// specific to the type being wrapped instead of being constrained to methods -/// on the `TTransport` trait. -/// -/// # Examples -/// -/// Create and use a `TPassThruTransport`. -/// -/// ```no_run -/// use std::cell::RefCell; -/// use std::rc::Rc; -/// use thrift::transport::{TPassThruTransport, TTcpTransport}; -/// -/// let t = TTcpTransport::new(); -/// let t = TPassThruTransport::new(Rc::new(RefCell::new(Box::new(t)))); -/// -/// // since the type parameter is maintained, we are able -/// // to use functions specific to `TTcpTransport` -/// t.inner.borrow_mut().open("localhost:9090").unwrap(); -/// ``` -pub struct TPassThruTransport<I: TTransport> { - pub inner: Rc<RefCell<Box<I>>>, -} - -impl<I: TTransport> TPassThruTransport<I> { - /// Create a `TPassThruTransport` that wraps an `inner` TTransport. - pub fn new(inner: Rc<RefCell<Box<I>>>) -> TPassThruTransport<I> { - TPassThruTransport { inner: inner } - } -} - -impl<I: TTransport> Read for TPassThruTransport<I> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.inner.borrow_mut().read(buf) - } -} - -impl<I: TTransport> Write for TPassThruTransport<I> { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.inner.borrow_mut().write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.inner.borrow_mut().flush() - } -} http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/lib/rs/src/transport/socket.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/transport/socket.rs b/lib/rs/src/transport/socket.rs index 9f2b8ba..16b59ef 100644 --- a/lib/rs/src/transport/socket.rs +++ b/lib/rs/src/transport/socket.rs @@ -21,69 +21,74 @@ use std::io::{ErrorKind, Read, Write}; use std::net::{Shutdown, TcpStream}; use std::ops::Drop; -use ::{TransportError, TransportErrorKind}; +use {TransportErrorKind, new_transport_error}; +use super::{ReadHalf, TIoChannel, WriteHalf}; -/// Communicate with a Thrift service over a TCP socket. +/// Bidirectional TCP/IP channel. /// /// # Examples /// -/// Create a `TTcpTransport`. +/// Create a `TTcpChannel`. /// /// ```no_run /// use std::io::{Read, Write}; -/// use thrift::transport::TTcpTransport; +/// use thrift::transport::TTcpChannel; /// -/// let mut t = TTcpTransport::new(); -/// t.open("localhost:9090").unwrap(); +/// let mut c = TTcpChannel::new(); +/// c.open("localhost:9090").unwrap(); /// /// let mut buf = vec![0u8; 4]; -/// t.read(&mut buf).unwrap(); -/// t.write(&vec![0, 1, 2]).unwrap(); +/// c.read(&mut buf).unwrap(); +/// c.write(&vec![0, 1, 2]).unwrap(); /// ``` /// -/// Create a `TTcpTransport` by wrapping an existing `TcpStream`. +/// Create a `TTcpChannel` by wrapping an existing `TcpStream`. /// /// ```no_run /// use std::io::{Read, Write}; /// use std::net::TcpStream; -/// use thrift::transport::TTcpTransport; +/// use thrift::transport::TTcpChannel; /// /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap(); -/// let mut t = TTcpTransport::with_stream(stream); /// -/// // no need to call t.open() since we've already connected above +/// // no need to call c.open() since we've already connected above +/// let mut c = TTcpChannel::with_stream(stream); /// /// let mut buf = vec![0u8; 4]; -/// t.read(&mut buf).unwrap(); -/// t.write(&vec![0, 1, 2]).unwrap(); +/// c.read(&mut buf).unwrap(); +/// c.write(&vec![0, 1, 2]).unwrap(); /// ``` -#[derive(Default)] -pub struct TTcpTransport { +#[derive(Debug, Default)] +pub struct TTcpChannel { stream: Option<TcpStream>, } -impl TTcpTransport { - /// Create an uninitialized `TTcpTransport`. +impl TTcpChannel { + /// Create an uninitialized `TTcpChannel`. /// - /// The returned instance must be opened using `TTcpTransport::open(...)` + /// The returned instance must be opened using `TTcpChannel::open(...)` /// before it can be used. - pub fn new() -> TTcpTransport { - TTcpTransport { stream: None } + pub fn new() -> TTcpChannel { + TTcpChannel { stream: None } } - /// Create a `TTcpTransport` that wraps an existing `TcpStream`. + /// Create a `TTcpChannel` that wraps an existing `TcpStream`. /// /// The passed-in stream is assumed to have been opened before being wrapped - /// by the created `TTcpTransport` instance. - pub fn with_stream(stream: TcpStream) -> TTcpTransport { - TTcpTransport { stream: Some(stream) } + /// by the created `TTcpChannel` instance. + pub fn with_stream(stream: TcpStream) -> TTcpChannel { + TTcpChannel { stream: Some(stream) } } /// Connect to `remote_address`, which should have the form `host:port`. pub fn open(&mut self, remote_address: &str) -> ::Result<()> { if self.stream.is_some() { - Err(::Error::Transport(TransportError::new(TransportErrorKind::AlreadyOpen, - "transport previously opened"))) + Err( + new_transport_error( + TransportErrorKind::AlreadyOpen, + "tcp connection previously opened", + ), + ) } else { match TcpStream::connect(&remote_address) { Ok(s) => { @@ -95,33 +100,62 @@ impl TTcpTransport { } } - /// Shutdown this transport. + /// Shut down this channel. /// /// Both send and receive halves are closed, and this instance can no /// longer be used to communicate with another endpoint. pub fn close(&mut self) -> ::Result<()> { - self.if_set(|s| s.shutdown(Shutdown::Both)).map_err(From::from) + self.if_set(|s| s.shutdown(Shutdown::Both)) + .map_err(From::from) } fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T> - where F: FnMut(&mut TcpStream) -> io::Result<T> + where + F: FnMut(&mut TcpStream) -> io::Result<T>, { if let Some(ref mut s) = self.stream { stream_operation(s) } else { - Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected")) + Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"),) } } } -impl Read for TTcpTransport { +impl TIoChannel for TTcpChannel { + fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)> + where + Self: Sized, + { + let mut s = self; + + s.stream + .as_mut() + .and_then(|s| s.try_clone().ok()) + .map( + |cloned| { + (ReadHalf { handle: TTcpChannel { stream: s.stream.take() } }, + WriteHalf { handle: TTcpChannel { stream: Some(cloned) } }) + }, + ) + .ok_or_else( + || { + new_transport_error( + TransportErrorKind::Unknown, + "cannot clone underlying tcp stream", + ) + }, + ) + } +} + +impl Read for TTcpChannel { fn read(&mut self, b: &mut [u8]) -> io::Result<usize> { self.if_set(|s| s.read(b)) } } -impl Write for TTcpTransport { +impl Write for TTcpChannel { fn write(&mut self, b: &[u8]) -> io::Result<usize> { self.if_set(|s| s.write(b)) } @@ -131,11 +165,11 @@ impl Write for TTcpTransport { } } -// Do I have to implement the Drop trait? TcpStream closes the socket on drop. -impl Drop for TTcpTransport { +// FIXME: Do I have to implement the Drop trait? TcpStream closes the socket on drop. +impl Drop for TTcpChannel { fn drop(&mut self) { if let Err(e) = self.close() { - warn!("error while closing socket transport: {:?}", e) + warn!("error while closing socket: {:?}", e) } } } http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/lib/rs/test/src/bin/kitchen_sink_client.rs ---------------------------------------------------------------------- diff --git a/lib/rs/test/src/bin/kitchen_sink_client.rs b/lib/rs/test/src/bin/kitchen_sink_client.rs index 27171be..9738298 100644 --- a/lib/rs/test/src/bin/kitchen_sink_client.rs +++ b/lib/rs/test/src/bin/kitchen_sink_client.rs @@ -21,13 +21,11 @@ extern crate clap; extern crate kitchen_sink; extern crate thrift; -use std::cell::RefCell; -use std::rc::Rc; - use kitchen_sink::base_two::{TNapkinServiceSyncClient, TRamenServiceSyncClient}; use kitchen_sink::midlayer::{MealServiceSyncClient, TMealServiceSyncClient}; use kitchen_sink::ultimate::{FullMealServiceSyncClient, TFullMealServiceSyncClient}; -use thrift::transport::{TFramedTransport, TTcpTransport, TTransport}; +use thrift::transport::{ReadHalf, TFramedReadTransport, TFramedWriteTransport, TIoChannel, + TTcpChannel, WriteHalf}; use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol, TCompactOutputProtocol, TInputProtocol, TOutputProtocol}; @@ -50,24 +48,25 @@ fn run() -> thrift::Result<()> { (@arg port: --port +takes_value "Port on which the Thrift test server is listening") (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")") (@arg service: --service +takes_value "Service type to contact (\"part\", \"full\")") - ).get_matches(); + ) + .get_matches(); let host = matches.value_of("host").unwrap_or("127.0.0.1"); let port = value_t!(matches, "port", u16).unwrap_or(9090); let protocol = matches.value_of("protocol").unwrap_or("compact"); let service = matches.value_of("service").unwrap_or("part"); - let t = open_tcp_transport(host, port)?; - let t = Rc::new(RefCell::new(Box::new(TFramedTransport::new(t)) as Box<TTransport>)); + let (i_chan, o_chan) = tcp_channel(host, port)?; + let (i_tran, o_tran) = (TFramedReadTransport::new(i_chan), TFramedWriteTransport::new(o_chan)); let (i_prot, o_prot): (Box<TInputProtocol>, Box<TOutputProtocol>) = match protocol { "binary" => { - (Box::new(TBinaryInputProtocol::new(t.clone(), true)), - Box::new(TBinaryOutputProtocol::new(t.clone(), true))) + (Box::new(TBinaryInputProtocol::new(i_tran, true)), + Box::new(TBinaryOutputProtocol::new(o_tran, true))) } "compact" => { - (Box::new(TCompactInputProtocol::new(t.clone())), - Box::new(TCompactOutputProtocol::new(t.clone()))) + (Box::new(TCompactInputProtocol::new(i_tran)), + Box::new(TCompactOutputProtocol::new(o_tran))) } unmatched => return Err(format!("unsupported protocol {}", unmatched).into()), }; @@ -75,28 +74,31 @@ fn run() -> thrift::Result<()> { run_client(service, i_prot, o_prot) } -fn run_client(service: &str, - i_prot: Box<TInputProtocol>, - o_prot: Box<TOutputProtocol>) - -> thrift::Result<()> { +fn run_client( + service: &str, + i_prot: Box<TInputProtocol>, + o_prot: Box<TOutputProtocol>, +) -> thrift::Result<()> { match service { "full" => run_full_meal_service(i_prot, o_prot), "part" => run_meal_service(i_prot, o_prot), - _ => Err(thrift::Error::from(format!("unknown service type {}", service))), + _ => Err(thrift::Error::from(format!("unknown service type {}", service)),), } } -fn open_tcp_transport(host: &str, port: u16) -> thrift::Result<Rc<RefCell<Box<TTransport>>>> { - let mut t = TTcpTransport::new(); - match t.open(&format!("{}:{}", host, port)) { - Ok(()) => Ok(Rc::new(RefCell::new(Box::new(t) as Box<TTransport>))), - Err(e) => Err(e), - } +fn tcp_channel( + host: &str, + port: u16, +) -> thrift::Result<(ReadHalf<TTcpChannel>, WriteHalf<TTcpChannel>)> { + let mut c = TTcpChannel::new(); + c.open(&format!("{}:{}", host, port))?; + c.split() } -fn run_meal_service(i_prot: Box<TInputProtocol>, - o_prot: Box<TOutputProtocol>) - -> thrift::Result<()> { +fn run_meal_service( + i_prot: Box<TInputProtocol>, + o_prot: Box<TOutputProtocol>, +) -> thrift::Result<()> { let mut client = MealServiceSyncClient::new(i_prot, o_prot); // client.full_meal(); // <-- IMPORTANT: if you uncomment this, compilation *should* fail @@ -110,9 +112,10 @@ fn run_meal_service(i_prot: Box<TInputProtocol>, Ok(()) } -fn run_full_meal_service(i_prot: Box<TInputProtocol>, - o_prot: Box<TOutputProtocol>) - -> thrift::Result<()> { +fn run_full_meal_service( + i_prot: Box<TInputProtocol>, + o_prot: Box<TOutputProtocol>, +) -> thrift::Result<()> { let mut client = FullMealServiceSyncClient::new(i_prot, o_prot); execute_call("full", "ramen", || client.ramen(100))?; @@ -124,17 +127,20 @@ fn run_full_meal_service(i_prot: Box<TInputProtocol>, } fn execute_call<F, R>(service_type: &str, call_name: &str, mut f: F) -> thrift::Result<()> - where F: FnMut() -> thrift::Result<R> +where + F: FnMut() -> thrift::Result<R>, { let res = f(); match res { Ok(_) => println!("{}: completed {} call", service_type, call_name), Err(ref e) => { - println!("{}: failed {} call with error {:?}", - service_type, - call_name, - e) + println!( + "{}: failed {} call with error {:?}", + service_type, + call_name, + e + ) } } http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/lib/rs/test/src/bin/kitchen_sink_server.rs ---------------------------------------------------------------------- diff --git a/lib/rs/test/src/bin/kitchen_sink_server.rs b/lib/rs/test/src/bin/kitchen_sink_server.rs index 4ce4fa3..19112cd 100644 --- a/lib/rs/test/src/bin/kitchen_sink_server.rs +++ b/lib/rs/test/src/bin/kitchen_sink_server.rs @@ -22,7 +22,7 @@ extern crate kitchen_sink; extern crate thrift; use kitchen_sink::base_one::Noodle; -use kitchen_sink::base_two::{Napkin, Ramen, NapkinServiceSyncHandler, RamenServiceSyncHandler}; +use kitchen_sink::base_two::{Napkin, NapkinServiceSyncHandler, Ramen, RamenServiceSyncHandler}; use kitchen_sink::midlayer::{Dessert, Meal, MealServiceSyncHandler, MealServiceSyncProcessor}; use kitchen_sink::ultimate::{Drink, FullMeal, FullMealAndDrinks, FullMealAndDrinksServiceSyncProcessor, FullMealServiceSyncHandler}; @@ -30,8 +30,9 @@ use kitchen_sink::ultimate::FullMealAndDrinksServiceSyncHandler; use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory, TCompactInputProtocolFactory, TCompactOutputProtocolFactory, TInputProtocolFactory, TOutputProtocolFactory}; -use thrift::transport::{TFramedTransportFactory, TTransportFactory}; -use thrift::server::TSimpleServer; +use thrift::transport::{TFramedReadTransportFactory, TFramedWriteTransportFactory, + TReadTransportFactory, TWriteTransportFactory}; +use thrift::server::TServer; fn main() { match run() { @@ -52,7 +53,8 @@ fn run() -> thrift::Result<()> { (@arg port: --port +takes_value "port on which the test server listens") (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")") (@arg service: --service +takes_value "Service type to contact (\"part\", \"full\")") - ).get_matches(); + ) + .get_matches(); let port = value_t!(matches, "port", u16).unwrap_or(9090); let protocol = matches.value_of("protocol").unwrap_or("compact"); @@ -61,9 +63,8 @@ fn run() -> thrift::Result<()> { println!("binding to {}", listen_address); - let (i_transport_factory, o_transport_factory): (Box<TTransportFactory>, - Box<TTransportFactory>) = - (Box::new(TFramedTransportFactory {}), Box::new(TFramedTransportFactory {})); + let r_transport_factory = TFramedReadTransportFactory::new(); + let w_transport_factory = TFramedWriteTransportFactory::new(); let (i_protocol_factory, o_protocol_factory): (Box<TInputProtocolFactory>, Box<TOutputProtocolFactory>) = @@ -93,51 +94,75 @@ fn run() -> thrift::Result<()> { // Since what I'm doing is uncommon I'm just going to duplicate the code match &*service { "part" => { - run_meal_server(&listen_address, - i_transport_factory, - i_protocol_factory, - o_transport_factory, - o_protocol_factory) + run_meal_server( + &listen_address, + r_transport_factory, + i_protocol_factory, + w_transport_factory, + o_protocol_factory, + ) } "full" => { - run_full_meal_server(&listen_address, - i_transport_factory, - i_protocol_factory, - o_transport_factory, - o_protocol_factory) + run_full_meal_server( + &listen_address, + r_transport_factory, + i_protocol_factory, + w_transport_factory, + o_protocol_factory, + ) } unknown => Err(format!("unsupported service type {}", unknown).into()), } } -fn run_meal_server(listen_address: &str, - i_transport_factory: Box<TTransportFactory>, - i_protocol_factory: Box<TInputProtocolFactory>, - o_transport_factory: Box<TTransportFactory>, - o_protocol_factory: Box<TOutputProtocolFactory>) - -> thrift::Result<()> { +fn run_meal_server<RTF, IPF, WTF, OPF>( + listen_address: &str, + r_transport_factory: RTF, + i_protocol_factory: IPF, + w_transport_factory: WTF, + o_protocol_factory: OPF, +) -> thrift::Result<()> +where + RTF: TReadTransportFactory + 'static, + IPF: TInputProtocolFactory + 'static, + WTF: TWriteTransportFactory + 'static, + OPF: TOutputProtocolFactory + 'static, +{ let processor = MealServiceSyncProcessor::new(PartHandler {}); - let mut server = TSimpleServer::new(i_transport_factory, - i_protocol_factory, - o_transport_factory, - o_protocol_factory, - processor); + let mut server = TServer::new( + r_transport_factory, + i_protocol_factory, + w_transport_factory, + o_protocol_factory, + processor, + 1, + ); server.listen(listen_address) } -fn run_full_meal_server(listen_address: &str, - i_transport_factory: Box<TTransportFactory>, - i_protocol_factory: Box<TInputProtocolFactory>, - o_transport_factory: Box<TTransportFactory>, - o_protocol_factory: Box<TOutputProtocolFactory>) - -> thrift::Result<()> { +fn run_full_meal_server<RTF, IPF, WTF, OPF>( + listen_address: &str, + r_transport_factory: RTF, + i_protocol_factory: IPF, + w_transport_factory: WTF, + o_protocol_factory: OPF, +) -> thrift::Result<()> +where + RTF: TReadTransportFactory + 'static, + IPF: TInputProtocolFactory + 'static, + WTF: TWriteTransportFactory + 'static, + OPF: TOutputProtocolFactory + 'static, +{ let processor = FullMealAndDrinksServiceSyncProcessor::new(FullHandler {}); - let mut server = TSimpleServer::new(i_transport_factory, - i_protocol_factory, - o_transport_factory, - o_protocol_factory, - processor); + let mut server = TServer::new( + r_transport_factory, + i_protocol_factory, + w_transport_factory, + o_protocol_factory, + processor, + 1, + ); server.listen(listen_address) } @@ -145,21 +170,21 @@ fn run_full_meal_server(listen_address: &str, struct PartHandler; impl MealServiceSyncHandler for PartHandler { - fn handle_meal(&mut self) -> thrift::Result<Meal> { + fn handle_meal(&self) -> thrift::Result<Meal> { println!("part: handling meal call"); Ok(meal()) } } impl RamenServiceSyncHandler for PartHandler { - fn handle_ramen(&mut self, _: i32) -> thrift::Result<Ramen> { + fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> { println!("part: handling ramen call"); Ok(ramen()) } } impl NapkinServiceSyncHandler for PartHandler { - fn handle_napkin(&mut self) -> thrift::Result<Napkin> { + fn handle_napkin(&self) -> thrift::Result<Napkin> { println!("part: handling napkin call"); Ok(napkin()) } @@ -171,34 +196,34 @@ impl NapkinServiceSyncHandler for PartHandler { struct FullHandler; impl FullMealAndDrinksServiceSyncHandler for FullHandler { - fn handle_full_meal_and_drinks(&mut self) -> thrift::Result<FullMealAndDrinks> { + fn handle_full_meal_and_drinks(&self) -> thrift::Result<FullMealAndDrinks> { Ok(FullMealAndDrinks::new(full_meal(), Drink::WHISKEY)) } } impl FullMealServiceSyncHandler for FullHandler { - fn handle_full_meal(&mut self) -> thrift::Result<FullMeal> { + fn handle_full_meal(&self) -> thrift::Result<FullMeal> { println!("full: handling full meal call"); Ok(full_meal()) } } impl MealServiceSyncHandler for FullHandler { - fn handle_meal(&mut self) -> thrift::Result<Meal> { + fn handle_meal(&self) -> thrift::Result<Meal> { println!("full: handling meal call"); Ok(meal()) } } impl RamenServiceSyncHandler for FullHandler { - fn handle_ramen(&mut self, _: i32) -> thrift::Result<Ramen> { + fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> { println!("full: handling ramen call"); Ok(ramen()) } } impl NapkinServiceSyncHandler for FullHandler { - fn handle_napkin(&mut self) -> thrift::Result<Napkin> { + fn handle_napkin(&self) -> thrift::Result<Napkin> { println!("full: handling napkin call"); Ok(napkin()) } http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/lib/rs/test/src/lib.rs ---------------------------------------------------------------------- diff --git a/lib/rs/test/src/lib.rs b/lib/rs/test/src/lib.rs index 8a7ccd0..53f4873 100644 --- a/lib/rs/test/src/lib.rs +++ b/lib/rs/test/src/lib.rs @@ -48,6 +48,9 @@ mod tests { #[test] fn must_be_able_to_use_defaults() { - let _ = midlayer::Meal { noodle: Some(base_one::Noodle::default()), ..Default::default() }; + let _ = midlayer::Meal { + noodle: Some(base_one::Noodle::default()), + ..Default::default() + }; } } http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/test/rs/src/bin/test_client.rs ---------------------------------------------------------------------- diff --git a/test/rs/src/bin/test_client.rs b/test/rs/src/bin/test_client.rs index a2ea832..aad78a0 100644 --- a/test/rs/src/bin/test_client.rs +++ b/test/rs/src/bin/test_client.rs @@ -22,14 +22,14 @@ extern crate thrift; extern crate thrift_test; // huh. I have to do this to use my lib use ordered_float::OrderedFloat; -use std::cell::RefCell; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Debug; -use std::rc::Rc; use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol, TCompactOutputProtocol, TInputProtocol, TOutputProtocol}; -use thrift::transport::{TBufferedTransport, TFramedTransport, TTcpTransport, TTransport}; +use thrift::transport::{ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, + TFramedReadTransport, TFramedWriteTransport, TIoChannel, TReadTransport, + TTcpChannel, TWriteTransport, WriteHalf}; use thrift_test::*; fn main() { @@ -58,7 +58,8 @@ fn run() -> thrift::Result<()> { (@arg transport: --transport +takes_value "Thrift transport implementation to use (\"buffered\", \"framed\")") (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")") (@arg testloops: -n --testloops +takes_value "Number of times to run tests") - ).get_matches(); + ) + .get_matches(); let host = matches.value_of("host").unwrap_or("127.0.0.1"); let port = value_t!(matches, "port", u16).unwrap_or(9090); @@ -66,32 +67,39 @@ fn run() -> thrift::Result<()> { let transport = matches.value_of("transport").unwrap_or("buffered"); let protocol = matches.value_of("protocol").unwrap_or("binary"); - let t = open_tcp_transport(host, port)?; + let (i_chan, o_chan) = tcp_channel(host, port)?; - let t: Box<TTransport> = match transport { - "buffered" => Box::new(TBufferedTransport::new(t)), - "framed" => Box::new(TFramedTransport::new(t)), + let (i_tran, o_tran) = match transport { + "buffered" => { + (Box::new(TBufferedReadTransport::new(i_chan)) as Box<TReadTransport>, + Box::new(TBufferedWriteTransport::new(o_chan)) as Box<TWriteTransport>) + } + "framed" => { + (Box::new(TFramedReadTransport::new(i_chan)) as Box<TReadTransport>, + Box::new(TFramedWriteTransport::new(o_chan)) as Box<TWriteTransport>) + } unmatched => return Err(format!("unsupported transport {}", unmatched).into()), }; - let t = Rc::new(RefCell::new(t)); let (i_prot, o_prot): (Box<TInputProtocol>, Box<TOutputProtocol>) = match protocol { "binary" => { - (Box::new(TBinaryInputProtocol::new(t.clone(), true)), - Box::new(TBinaryOutputProtocol::new(t.clone(), true))) + (Box::new(TBinaryInputProtocol::new(i_tran, true)), + Box::new(TBinaryOutputProtocol::new(o_tran, true))) } "compact" => { - (Box::new(TCompactInputProtocol::new(t.clone())), - Box::new(TCompactOutputProtocol::new(t.clone()))) + (Box::new(TCompactInputProtocol::new(i_tran)), + Box::new(TCompactOutputProtocol::new(o_tran))) } unmatched => return Err(format!("unsupported protocol {}", unmatched).into()), }; - println!("connecting to {}:{} with {}+{} stack", - host, - port, - protocol, - transport); + println!( + "connecting to {}:{} with {}+{} stack", + host, + port, + protocol, + transport + ); let mut client = ThriftTestSyncClient::new(i_prot, o_prot); @@ -102,16 +110,19 @@ fn run() -> thrift::Result<()> { Ok(()) } -// FIXME: expose "open" through the client interface so I don't have to early open the transport -fn open_tcp_transport(host: &str, port: u16) -> thrift::Result<Rc<RefCell<Box<TTransport>>>> { - let mut t = TTcpTransport::new(); - match t.open(&format!("{}:{}", host, port)) { - Ok(()) => Ok(Rc::new(RefCell::new(Box::new(t) as Box<TTransport>))), - Err(e) => Err(e), - } +// FIXME: expose "open" through the client interface so I don't have to early +// open +fn tcp_channel( + host: &str, + port: u16, +) -> thrift::Result<(ReadHalf<TTcpChannel>, WriteHalf<TTcpChannel>)> { + let mut c = TTcpChannel::new(); + c.open(&format!("{}:{}", host, port))?; + c.split() } -fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Error> { +fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<TOutputProtocol>>,) + -> Result<(), thrift::Error> { println!("testVoid"); client.test_void()?; @@ -131,12 +142,15 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er verify_expected_result(client.test_i32(1159348374), 1159348374)?; println!("testi64"); - // try!(verify_expected_result(client.test_i64(-8651829879438294565), -8651829879438294565)); + // try!(verify_expected_result(client.test_i64(-8651829879438294565), + // -8651829879438294565)); verify_expected_result(client.test_i64(i64::min_value()), i64::min_value())?; println!("testDouble"); - verify_expected_result(client.test_double(OrderedFloat::from(42.42)), - OrderedFloat::from(42.42))?; + verify_expected_result( + client.test_double(OrderedFloat::from(42.42)), + OrderedFloat::from(42.42), + )?; println!("testTypedef"); { @@ -175,10 +189,14 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er } // Xtruct again, with optional values - // FIXME: apparently the erlang thrift server does not like opt-in-req-out parameters that are undefined. Joy. + // FIXME: apparently the erlang thrift server does not like opt-in-req-out + // parameters that are undefined. Joy. // { - // let x_snd = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: None, i32_thing: None, i64_thing: Some(12938492818) }; - // let x_cmp = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: Some(0), i32_thing: Some(0), i64_thing: Some(12938492818) }; // the C++ server is responding correctly + // let x_snd = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: None, + // i32_thing: None, i64_thing: Some(12938492818) }; + // let x_cmp = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: + // Some(0), i32_thing: Some(0), i64_thing: Some(12938492818) }; // the C++ + // server is responding correctly // try!(verify_expected_result(client.test_struct(x_snd), x_cmp)); // } // @@ -188,22 +206,26 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er { let x_snd = Xtruct2 { byte_thing: Some(32), - struct_thing: Some(Xtruct { - string_thing: Some("foo".to_owned()), - byte_thing: Some(1), - i32_thing: Some(324382098), - i64_thing: Some(12938492818), - }), + struct_thing: Some( + Xtruct { + string_thing: Some("foo".to_owned()), + byte_thing: Some(1), + i32_thing: Some(324382098), + i64_thing: Some(12938492818), + }, + ), i32_thing: Some(293481098), }; let x_cmp = Xtruct2 { byte_thing: Some(32), - struct_thing: Some(Xtruct { - string_thing: Some("foo".to_owned()), - byte_thing: Some(1), - i32_thing: Some(324382098), - i64_thing: Some(12938492818), - }), + struct_thing: Some( + Xtruct { + string_thing: Some("foo".to_owned()), + byte_thing: Some(1), + i32_thing: Some(324382098), + i64_thing: Some(12938492818), + }, + ), i32_thing: Some(293481098), }; verify_expected_result(client.test_nest(x_snd), x_cmp)?; @@ -270,7 +292,8 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er } // nested map - // expect : {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2 => 2, 3 => 3, 4 => 4, }, } + // expect : {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2 + // => 2, 3 => 3, 4 => 4, }, } println!("testMapMap"); { let mut m_cmp_nested_0: BTreeMap<i32, i32> = BTreeMap::new(); @@ -302,13 +325,10 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er i64_thing: Some(-19234123981), }; - verify_expected_result(client.test_multi(1, - -123948, - -19234123981, - m_snd, - Numberz::EIGHT, - 81), - s_cmp)?; + verify_expected_result( + client.test_multi(1, -123948, -19234123981, m_snd, Numberz::EIGHT, 81), + s_cmp, + )?; } // Insanity @@ -324,24 +344,30 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er arg_map_usermap.insert(Numberz::EIGHT, 19); let mut arg_vec_xtructs: Vec<Xtruct> = Vec::new(); - arg_vec_xtructs.push(Xtruct { - string_thing: Some("foo".to_owned()), - byte_thing: Some(8), - i32_thing: Some(29), - i64_thing: Some(92384), - }); - arg_vec_xtructs.push(Xtruct { - string_thing: Some("bar".to_owned()), - byte_thing: Some(28), - i32_thing: Some(2), - i64_thing: Some(-1281), - }); - arg_vec_xtructs.push(Xtruct { - string_thing: Some("baz".to_owned()), - byte_thing: Some(0), - i32_thing: Some(3948539), - i64_thing: Some(-12938492), - }); + arg_vec_xtructs.push( + Xtruct { + string_thing: Some("foo".to_owned()), + byte_thing: Some(8), + i32_thing: Some(29), + i64_thing: Some(92384), + }, + ); + arg_vec_xtructs.push( + Xtruct { + string_thing: Some("bar".to_owned()), + byte_thing: Some(28), + i32_thing: Some(2), + i64_thing: Some(-1281), + }, + ); + arg_vec_xtructs.push( + Xtruct { + string_thing: Some("baz".to_owned()), + byte_thing: Some(0), + i32_thing: Some(3948539), + i64_thing: Some(-12938492), + }, + ); let mut s_cmp_nested_1: BTreeMap<Numberz, Insanity> = BTreeMap::new(); let insanity = Insanity { @@ -372,7 +398,7 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er Err(thrift::Error::User(ref e)) => { match e.downcast_ref::<Xception>() { Some(x) => Ok(x), - None => Err(thrift::Error::User("did not get expected Xception struct".into())), + None => Err(thrift::Error::User("did not get expected Xception struct".into()),), } } _ => Err(thrift::Error::User("did not get exception".into())), @@ -414,7 +440,7 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er Err(thrift::Error::User(ref e)) => { match e.downcast_ref::<Xception>() { Some(x) => Ok(x), - None => Err(thrift::Error::User("did not get expected Xception struct".into())), + None => Err(thrift::Error::User("did not get expected Xception struct".into()),), } } _ => Err(thrift::Error::User("did not get exception".into())), @@ -435,7 +461,7 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er Err(thrift::Error::User(ref e)) => { match e.downcast_ref::<Xception2>() { Some(x) => Ok(x), - None => Err(thrift::Error::User("did not get expected Xception struct".into())), + None => Err(thrift::Error::User("did not get expected Xception struct".into()),), } } _ => Err(thrift::Error::User("did not get exception".into())), @@ -443,12 +469,17 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er let x_cmp = Xception2 { error_code: Some(2002), - struct_thing: Some(Xtruct { - string_thing: Some("This is an Xception2".to_owned()), - byte_thing: Some(0), /* since this is an OPT_IN_REQ_OUT field the sender sets a default */ - i32_thing: Some(0), /* since this is an OPT_IN_REQ_OUT field the sender sets a default */ - i64_thing: Some(0), /* since this is an OPT_IN_REQ_OUT field the sender sets a default */ - }), + struct_thing: Some( + Xtruct { + string_thing: Some("This is an Xception2".to_owned()), + // since this is an OPT_IN_REQ_OUT field the sender sets a default + byte_thing: Some(0), + // since this is an OPT_IN_REQ_OUT field the sender sets a default + i32_thing: Some(0), + // since this is an OPT_IN_REQ_OUT field the sender sets a default + i64_thing: Some(0), + }, + ), }; verify_expected_result(Ok(x), &x_cmp)?; @@ -458,17 +489,18 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er { let r = client.test_multi_exception("haha".to_owned(), "RETURNED".to_owned()); let x = match r { - Err(e) => { - Err(thrift::Error::User(format!("received an unexpected exception {:?}", e).into())) - } + Err(e) => Err(thrift::Error::User(format!("received an unexpected exception {:?}", e).into(),),), _ => r, }?; let x_cmp = Xtruct { string_thing: Some("RETURNED".to_owned()), - byte_thing: Some(0), // since this is an OPT_IN_REQ_OUT field the sender sets a default - i32_thing: Some(0), // since this is an OPT_IN_REQ_OUT field the sender sets a default - i64_thing: Some(0), // since this is an OPT_IN_REQ_OUT field the sender sets a default + // since this is an OPT_IN_REQ_OUT field the sender sets a default + byte_thing: Some(0), + // since this is an OPT_IN_REQ_OUT field the sender sets a default + i32_thing: Some(0), + // since this is an OPT_IN_REQ_OUT field the sender sets a default + i64_thing: Some(0), }; verify_expected_result(Ok(x), x_cmp)?; @@ -479,20 +511,22 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Er client.test_oneway(1)?; } - // final test to verify that the connection is still writable after the one-way call + // final test to verify that the connection is still writable after the one-way + // call client.test_void() } -fn verify_expected_result<T: Debug + PartialEq + Sized>(actual: Result<T, thrift::Error>, - expected: T) - -> Result<(), thrift::Error> { +#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] +fn verify_expected_result<T: Debug + PartialEq + Sized>( + actual: Result<T, thrift::Error>, + expected: T, +) -> Result<(), thrift::Error> { match actual { Ok(v) => { if v == expected { Ok(()) } else { - Err(thrift::Error::User(format!("expected {:?} but got {:?}", &expected, &v) - .into())) + Err(thrift::Error::User(format!("expected {:?} but got {:?}", &expected, &v).into()),) } } Err(e) => Err(e), http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/test/rs/src/bin/test_server.rs ---------------------------------------------------------------------- diff --git a/test/rs/src/bin/test_server.rs b/test/rs/src/bin/test_server.rs index 613cd55..9c738ab 100644 --- a/test/rs/src/bin/test_server.rs +++ b/test/rs/src/bin/test_server.rs @@ -29,8 +29,10 @@ use std::time::Duration; use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory, TCompactInputProtocolFactory, TCompactOutputProtocolFactory, TInputProtocolFactory, TOutputProtocolFactory}; -use thrift::server::TSimpleServer; -use thrift::transport::{TBufferedTransportFactory, TFramedTransportFactory, TTransportFactory}; +use thrift::server::TServer; +use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory, + TFramedReadTransportFactory, TFramedWriteTransportFactory, + TReadTransportFactory, TWriteTransportFactory}; use thrift_test::*; fn main() { @@ -49,7 +51,6 @@ fn run() -> thrift::Result<()> { // --domain-socket // --named-pipe // --ssl - // --workers let matches = clap_app!(rust_test_client => (version: "1.0") (author: "Apache Thrift Developers <[email protected]>") @@ -57,29 +58,35 @@ fn run() -> thrift::Result<()> { (@arg port: --port +takes_value "port on which the test server listens") (@arg transport: --transport +takes_value "transport implementation to use (\"buffered\", \"framed\")") (@arg protocol: --protocol +takes_value "protocol implementation to use (\"binary\", \"compact\")") - (@arg server_type: --server_type +takes_value "type of server instantiated (\"simple\", \"thread-pool\", \"threaded\", \"non-blocking\")") - ).get_matches(); + (@arg server_type: --server_type +takes_value "type of server instantiated (\"simple\", \"thread-pool\")") + (@arg workers: -n --workers +takes_value "number of thread-pool workers (\"4\")") + ) + .get_matches(); let port = value_t!(matches, "port", u16).unwrap_or(9090); let transport = matches.value_of("transport").unwrap_or("buffered"); let protocol = matches.value_of("protocol").unwrap_or("binary"); - let server_type = matches.value_of("server_type").unwrap_or("simple"); + let server_type = matches.value_of("server_type").unwrap_or("thread-pool"); + let workers = value_t!(matches, "workers", usize).unwrap_or(4); let listen_address = format!("127.0.0.1:{}", port); println!("binding to {}", listen_address); - let (i_transport_factory, o_transport_factory): (Box<TTransportFactory>, - Box<TTransportFactory>) = match &*transport { - "buffered" => { - (Box::new(TBufferedTransportFactory::new()), Box::new(TBufferedTransportFactory::new())) - } - "framed" => { - (Box::new(TFramedTransportFactory::new()), Box::new(TFramedTransportFactory::new())) - } - unknown => { - return Err(format!("unsupported transport type {}", unknown).into()); - } - }; + let (i_transport_factory, o_transport_factory): (Box<TReadTransportFactory>, + Box<TWriteTransportFactory>) = + match &*transport { + "buffered" => { + (Box::new(TBufferedReadTransportFactory::new()), + Box::new(TBufferedWriteTransportFactory::new())) + } + "framed" => { + (Box::new(TFramedReadTransportFactory::new()), + Box::new(TFramedWriteTransportFactory::new())) + } + unknown => { + return Err(format!("unsupported transport type {}", unknown).into()); + } + }; let (i_protocol_factory, o_protocol_factory): (Box<TInputProtocolFactory>, Box<TOutputProtocolFactory>) = @@ -101,11 +108,24 @@ fn run() -> thrift::Result<()> { let mut server = match &*server_type { "simple" => { - TSimpleServer::new(i_transport_factory, - i_protocol_factory, - o_transport_factory, - o_protocol_factory, - processor) + TServer::new( + i_transport_factory, + i_protocol_factory, + o_transport_factory, + o_protocol_factory, + processor, + 1, + ) + } + "thread-pool" => { + TServer::new( + i_transport_factory, + i_protocol_factory, + o_transport_factory, + o_protocol_factory, + processor, + workers, + ) } unknown => { return Err(format!("unsupported server type {}", unknown).into()); @@ -117,95 +137,93 @@ fn run() -> thrift::Result<()> { struct ThriftTestSyncHandlerImpl; impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl { - fn handle_test_void(&mut self) -> thrift::Result<()> { + fn handle_test_void(&self) -> thrift::Result<()> { println!("testVoid()"); Ok(()) } - fn handle_test_string(&mut self, thing: String) -> thrift::Result<String> { + fn handle_test_string(&self, thing: String) -> thrift::Result<String> { println!("testString({})", &thing); Ok(thing) } - fn handle_test_bool(&mut self, thing: bool) -> thrift::Result<bool> { + fn handle_test_bool(&self, thing: bool) -> thrift::Result<bool> { println!("testBool({})", thing); Ok(thing) } - fn handle_test_byte(&mut self, thing: i8) -> thrift::Result<i8> { + fn handle_test_byte(&self, thing: i8) -> thrift::Result<i8> { println!("testByte({})", thing); Ok(thing) } - fn handle_test_i32(&mut self, thing: i32) -> thrift::Result<i32> { + fn handle_test_i32(&self, thing: i32) -> thrift::Result<i32> { println!("testi32({})", thing); Ok(thing) } - fn handle_test_i64(&mut self, thing: i64) -> thrift::Result<i64> { + fn handle_test_i64(&self, thing: i64) -> thrift::Result<i64> { println!("testi64({})", thing); Ok(thing) } - fn handle_test_double(&mut self, - thing: OrderedFloat<f64>) - -> thrift::Result<OrderedFloat<f64>> { + fn handle_test_double(&self, thing: OrderedFloat<f64>) -> thrift::Result<OrderedFloat<f64>> { println!("testDouble({})", thing); Ok(thing) } - fn handle_test_binary(&mut self, thing: Vec<u8>) -> thrift::Result<Vec<u8>> { + fn handle_test_binary(&self, thing: Vec<u8>) -> thrift::Result<Vec<u8>> { println!("testBinary({:?})", thing); Ok(thing) } - fn handle_test_struct(&mut self, thing: Xtruct) -> thrift::Result<Xtruct> { + fn handle_test_struct(&self, thing: Xtruct) -> thrift::Result<Xtruct> { println!("testStruct({:?})", thing); Ok(thing) } - fn handle_test_nest(&mut self, thing: Xtruct2) -> thrift::Result<Xtruct2> { + fn handle_test_nest(&self, thing: Xtruct2) -> thrift::Result<Xtruct2> { println!("testNest({:?})", thing); Ok(thing) } - fn handle_test_map(&mut self, thing: BTreeMap<i32, i32>) -> thrift::Result<BTreeMap<i32, i32>> { + fn handle_test_map(&self, thing: BTreeMap<i32, i32>) -> thrift::Result<BTreeMap<i32, i32>> { println!("testMap({:?})", thing); Ok(thing) } - fn handle_test_string_map(&mut self, - thing: BTreeMap<String, String>) - -> thrift::Result<BTreeMap<String, String>> { + fn handle_test_string_map( + &self, + thing: BTreeMap<String, String>, + ) -> thrift::Result<BTreeMap<String, String>> { println!("testStringMap({:?})", thing); Ok(thing) } - fn handle_test_set(&mut self, thing: BTreeSet<i32>) -> thrift::Result<BTreeSet<i32>> { + fn handle_test_set(&self, thing: BTreeSet<i32>) -> thrift::Result<BTreeSet<i32>> { println!("testSet({:?})", thing); Ok(thing) } - fn handle_test_list(&mut self, thing: Vec<i32>) -> thrift::Result<Vec<i32>> { + fn handle_test_list(&self, thing: Vec<i32>) -> thrift::Result<Vec<i32>> { println!("testList({:?})", thing); Ok(thing) } - fn handle_test_enum(&mut self, thing: Numberz) -> thrift::Result<Numberz> { + fn handle_test_enum(&self, thing: Numberz) -> thrift::Result<Numberz> { println!("testEnum({:?})", thing); Ok(thing) } - fn handle_test_typedef(&mut self, thing: UserId) -> thrift::Result<UserId> { + fn handle_test_typedef(&self, thing: UserId) -> thrift::Result<UserId> { println!("testTypedef({})", thing); Ok(thing) } /// @return map<i32,map<i32,i32>> - returns a dictionary with these values: - /// {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2 => 2, 3 => 3, 4 => 4, }, } - fn handle_test_map_map(&mut self, - hello: i32) - -> thrift::Result<BTreeMap<i32, BTreeMap<i32, i32>>> { + /// {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2 => + /// 2, 3 => 3, 4 => 4, }, } + fn handle_test_map_map(&self, hello: i32) -> thrift::Result<BTreeMap<i32, BTreeMap<i32, i32>>> { println!("testMapMap({})", hello); let mut inner_map_0: BTreeMap<i32, i32> = BTreeMap::new(); @@ -232,9 +250,10 @@ impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl { /// 2 => { 6 => <empty Insanity struct>, }, /// } /// return map<UserId, map<Numberz,Insanity>> - a map with the above values - fn handle_test_insanity(&mut self, - argument: Insanity) - -> thrift::Result<BTreeMap<UserId, BTreeMap<Numberz, Insanity>>> { + fn handle_test_insanity( + &self, + argument: Insanity, + ) -> thrift::Result<BTreeMap<UserId, BTreeMap<Numberz, Insanity>>> { println!("testInsanity({:?})", argument); let mut map_0: BTreeMap<Numberz, Insanity> = BTreeMap::new(); map_0.insert(Numberz::TWO, argument.clone()); @@ -254,15 +273,18 @@ impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl { Ok(ret) } - /// returns an Xtruct with string_thing = "Hello2", byte_thing = arg0, i32_thing = arg1 and i64_thing = arg2 - fn handle_test_multi(&mut self, - arg0: i8, - arg1: i32, - arg2: i64, - _: BTreeMap<i16, String>, - _: Numberz, - _: UserId) - -> thrift::Result<Xtruct> { + /// returns an Xtruct with: + /// string_thing = "Hello2", byte_thing = arg0, i32_thing = arg1 and + /// i64_thing = arg2 + fn handle_test_multi( + &self, + arg0: i8, + arg1: i32, + arg2: i64, + _: BTreeMap<i16, String>, + _: Numberz, + _: UserId, + ) -> thrift::Result<Xtruct> { let x_ret = Xtruct { string_thing: Some("Hello2".to_owned()), byte_thing: Some(arg0), @@ -273,64 +295,77 @@ impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl { Ok(x_ret) } - /// if arg == "Xception" throw Xception with errorCode = 1001 and message = arg + /// if arg == "Xception" throw Xception with errorCode = 1001 and message = + /// arg /// else if arg == "TException" throw TException /// else do not throw anything - fn handle_test_exception(&mut self, arg: String) -> thrift::Result<()> { + fn handle_test_exception(&self, arg: String) -> thrift::Result<()> { println!("testException({})", arg); match &*arg { "Xception" => { - Err((Xception { - error_code: Some(1001), - message: Some(arg), - }) - .into()) + Err( + (Xception { + error_code: Some(1001), + message: Some(arg), + }) + .into(), + ) } "TException" => Err("this is a random error".into()), _ => Ok(()), } } - /// if arg0 == "Xception" throw Xception with errorCode = 1001 and message = "This is an Xception" - /// else if arg0 == "Xception2" throw Xception2 with errorCode = 2002 and struct_thing.string_thing = "This is an Xception2" - // else do not throw anything and return Xtruct with string_thing = arg1 - fn handle_test_multi_exception(&mut self, - arg0: String, - arg1: String) - -> thrift::Result<Xtruct> { + /// if arg0 == "Xception": + /// throw Xception with errorCode = 1001 and message = "This is an + /// Xception" + /// else if arg0 == "Xception2": + /// throw Xception2 with errorCode = 2002 and struct_thing.string_thing = + /// "This is an Xception2" + // else: + // do not throw anything and return Xtruct with string_thing = arg1 + fn handle_test_multi_exception(&self, arg0: String, arg1: String) -> thrift::Result<Xtruct> { match &*arg0 { "Xception" => { - Err((Xception { - error_code: Some(1001), - message: Some("This is an Xception".to_owned()), - }) - .into()) + Err( + (Xception { + error_code: Some(1001), + message: Some("This is an Xception".to_owned()), + }) + .into(), + ) } "Xception2" => { - Err((Xception2 { - error_code: Some(2002), - struct_thing: Some(Xtruct { - string_thing: Some("This is an Xception2".to_owned()), - byte_thing: None, - i32_thing: None, - i64_thing: None, - }), - }) - .into()) + Err( + (Xception2 { + error_code: Some(2002), + struct_thing: Some( + Xtruct { + string_thing: Some("This is an Xception2".to_owned()), + byte_thing: None, + i32_thing: None, + i64_thing: None, + }, + ), + }) + .into(), + ) } _ => { - Ok(Xtruct { - string_thing: Some(arg1), - byte_thing: None, - i32_thing: None, - i64_thing: None, - }) + Ok( + Xtruct { + string_thing: Some(arg1), + byte_thing: None, + i32_thing: None, + i64_thing: None, + }, + ) } } } - fn handle_test_oneway(&mut self, seconds_to_sleep: i32) -> thrift::Result<()> { + fn handle_test_oneway(&self, seconds_to_sleep: i32) -> thrift::Result<()> { thread::sleep(Duration::from_secs(seconds_to_sleep as u64)); Ok(()) } http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/tutorial/rs/README.md ---------------------------------------------------------------------- diff --git a/tutorial/rs/README.md b/tutorial/rs/README.md index 4d0d7c8..384e9f8 100644 --- a/tutorial/rs/README.md +++ b/tutorial/rs/README.md @@ -35,13 +35,12 @@ extern crate thrift; extern crate try_from; // generated Rust module -mod tutorial; +use tutorial; -use std::cell::RefCell; -use std::rc::Rc; -use thrift::protocol::{TInputProtocol, TOutputProtocol}; use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol}; -use thrift::transport::{TFramedTransport, TTcpTransport, TTransport}; +use thrift::protocol::{TInputProtocol, TOutputProtocol}; +use thrift::transport::{TFramedReadTransport, TFramedWriteTransport}; +use thrift::transport::{TIoChannel, TTcpChannel}; use tutorial::{CalculatorSyncClient, TCalculatorSyncClient}; use tutorial::{Operation, Work}; @@ -61,28 +60,16 @@ fn run() -> thrift::Result<()> { // println!("connect to server on 127.0.0.1:9090"); - let mut t = TTcpTransport::new(); - let t = match t.open("127.0.0.1:9090") { - Ok(()) => t, - Err(e) => { - return Err( - format!("failed to connect with {:?}", e).into() - ); - } - }; - - let t = Rc::new(RefCell::new( - Box::new(t) as Box<TTransport> - )); - let t = Rc::new(RefCell::new( - Box::new(TFramedTransport::new(t)) as Box<TTransport> - )); + let mut c = TTcpTransport::new(); + c.open("127.0.0.1:9090")?; - let i_prot: Box<TInputProtocol> = Box::new( - TCompactInputProtocol::new(t.clone()) + let (i_chan, o_chan) = c.split()?; + + let i_prot = TCompactInputProtocol::new( + TFramedReadTransport::new(i_chan) ); - let o_prot: Box<TOutputProtocol> = Box::new( - TCompactOutputProtocol::new(t.clone()) + let o_prot = TCompactOutputProtocol::new( + TFramedWriteTransport::new(o_chan) ); let client = CalculatorSyncClient::new(i_prot, o_prot); @@ -177,10 +164,10 @@ A typedef is translated to a `pub type` declaration. ```thrift typedef i64 UserId -typedef map<string, Bonk> MapType +typedef map<string, UserId> MapType ``` ```rust -pub type UserId = 164; +pub type UserId = i64; pub type MapType = BTreeMap<String, Bonk>; ``` @@ -327,4 +314,4 @@ pub struct Foo { ## Known Issues * Struct constants are not supported -* Map, list and set constants require a const holder struct \ No newline at end of file +* Map, list and set constants require a const holder struct http://git-wip-us.apache.org/repos/asf/thrift/blob/0e22c362/tutorial/rs/src/bin/tutorial_client.rs ---------------------------------------------------------------------- diff --git a/tutorial/rs/src/bin/tutorial_client.rs b/tutorial/rs/src/bin/tutorial_client.rs index 2b0d4f9..24ab4be 100644 --- a/tutorial/rs/src/bin/tutorial_client.rs +++ b/tutorial/rs/src/bin/tutorial_client.rs @@ -21,15 +21,12 @@ extern crate clap; extern crate thrift; extern crate thrift_tutorial; -use std::cell::RefCell; -use std::rc::Rc; - -use thrift::protocol::{TInputProtocol, TOutputProtocol}; use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol}; -use thrift::transport::{TFramedTransport, TTcpTransport, TTransport}; +use thrift::transport::{ReadHalf, TFramedReadTransport, TFramedWriteTransport, TIoChannel, + TTcpChannel, WriteHalf}; use thrift_tutorial::shared::TSharedServiceSyncClient; -use thrift_tutorial::tutorial::{CalculatorSyncClient, TCalculatorSyncClient, Operation, Work}; +use thrift_tutorial::tutorial::{CalculatorSyncClient, Operation, TCalculatorSyncClient, Work}; fn main() { match run() { @@ -73,7 +70,8 @@ fn run() -> thrift::Result<()> { let logid = 32; // let's do...a multiply! - let res = client.calculate(logid, Work::new(7, 8, Operation::MULTIPLY, None))?; + let res = client + .calculate(logid, Work::new(7, 8, Operation::MULTIPLY, None))?; println!("multiplied 7 and 8 and got {}", res); // let's get the log for it @@ -102,34 +100,31 @@ fn run() -> thrift::Result<()> { Ok(()) } -fn new_client(host: &str, port: u16) -> thrift::Result<CalculatorSyncClient> { - let mut t = TTcpTransport::new(); +type ClientInputProtocol = TCompactInputProtocol<TFramedReadTransport<ReadHalf<TTcpChannel>>>; +type ClientOutputProtocol = TCompactOutputProtocol<TFramedWriteTransport<WriteHalf<TTcpChannel>>>; + +fn new_client + ( + host: &str, + port: u16, +) -> thrift::Result<CalculatorSyncClient<ClientInputProtocol, ClientOutputProtocol>> { + let mut c = TTcpChannel::new(); // open the underlying TCP stream println!("connecting to tutorial server on {}:{}", host, port); - let t = match t.open(&format!("{}:{}", host, port)) { - Ok(()) => t, - Err(e) => { - return Err(format!("failed to open tcp stream to {}:{} error:{:?}", - host, - port, - e) - .into()); - } - }; - - // refcounted because it's shared by both input and output transports - let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>)); + c.open(&format!("{}:{}", host, port))?; - // wrap a raw socket (slow) with a buffered transport of some kind - let t = Box::new(TFramedTransport::new(t)) as Box<TTransport>; + // clone the TCP channel into two halves, one which + // we'll use for reading, the other for writing + let (i_chan, o_chan) = c.split()?; - // refcounted again because it's shared by both input and output protocols - let t = Rc::new(RefCell::new(t)); + // wrap the raw sockets (slow) with a buffered transport of some kind + let i_tran = TFramedReadTransport::new(i_chan); + let o_tran = TFramedWriteTransport::new(o_chan); // now create the protocol implementations - let i_prot = Box::new(TCompactInputProtocol::new(t.clone())) as Box<TInputProtocol>; - let o_prot = Box::new(TCompactOutputProtocol::new(t.clone())) as Box<TOutputProtocol>; + let i_prot = TCompactInputProtocol::new(i_tran); + let o_prot = TCompactOutputProtocol::new(o_tran); // we're done! Ok(CalculatorSyncClient::new(i_prot, o_prot))
