This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d9381c66c2 Add `IpcError` variant to replace some uses of 
`IoError`that don't have underlying `std::io::Error` (#4726)
d9381c66c2 is described below

commit d9381c66c25f52bb8d6fbd9503947c804a89a37a
Author: Alexandre Crayssac <[email protected]>
AuthorDate: Thu Aug 24 14:06:38 2023 +0200

    Add `IpcError` variant to replace some uses of `IoError`that don't have 
underlying `std::io::Error` (#4726)
---
 arrow-flight/examples/flight_sql_server.rs |  2 +-
 arrow-flight/src/bin/flight_sql_client.rs  | 10 +++---
 arrow-flight/src/sql/client.rs             | 16 +++++----
 arrow-flight/tests/encode_decode.rs        |  2 +-
 arrow-ipc/src/convert.rs                   |  4 +--
 arrow-ipc/src/reader.rs                    | 54 ++++++++++++++++--------------
 arrow-ipc/src/writer.rs                    |  8 ++---
 arrow-schema/src/error.rs                  | 10 +++---
 arrow/src/ffi_stream.rs                    |  2 +-
 9 files changed, 58 insertions(+), 50 deletions(-)

diff --git a/arrow-flight/examples/flight_sql_server.rs 
b/arrow-flight/examples/flight_sql_server.rs
index 08a36bc49e..1e99957390 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -802,7 +802,7 @@ mod tests {
 
     fn endpoint(uri: String) -> Result<Endpoint, ArrowError> {
         let endpoint = Endpoint::new(uri)
-            .map_err(|_| ArrowError::IoError("Cannot create 
endpoint".to_string()))?
+            .map_err(|_| ArrowError::IpcError("Cannot create 
endpoint".to_string()))?
             .connect_timeout(Duration::from_secs(20))
             .timeout(Duration::from_secs(20))
             .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't 
want packets to wait
diff --git a/arrow-flight/src/bin/flight_sql_client.rs 
b/arrow-flight/src/bin/flight_sql_client.rs
index e5aacc2e77..20c8062f89 100644
--- a/arrow-flight/src/bin/flight_sql_client.rs
+++ b/arrow-flight/src/bin/flight_sql_client.rs
@@ -151,7 +151,7 @@ async fn setup_client(
     let protocol = if args.tls { "https" } else { "http" };
 
     let mut endpoint = Endpoint::new(format!("{}://{}:{}", protocol, 
args.host, port))
-        .map_err(|_| ArrowError::IoError("Cannot create 
endpoint".to_string()))?
+        .map_err(|_| ArrowError::IpcError("Cannot create 
endpoint".to_string()))?
         .connect_timeout(Duration::from_secs(20))
         .timeout(Duration::from_secs(20))
         .tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want 
packets to wait
@@ -162,15 +162,15 @@ async fn setup_client(
 
     if args.tls {
         let tls_config = ClientTlsConfig::new();
-        endpoint = endpoint
-            .tls_config(tls_config)
-            .map_err(|_| ArrowError::IoError("Cannot create TLS 
endpoint".to_string()))?;
+        endpoint = endpoint.tls_config(tls_config).map_err(|_| {
+            ArrowError::IpcError("Cannot create TLS endpoint".to_string())
+        })?;
     }
 
     let channel = endpoint
         .connect()
         .await
-        .map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint: 
{e}")))?;
+        .map_err(|e| ArrowError::IpcError(format!("Cannot connect to endpoint: 
{e}")))?;
 
     let mut client = FlightSqlServiceClient::new(channel);
     info!("connected");
diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs
index d661c96409..4b1f38ebcb 100644
--- a/arrow-flight/src/sql/client.rs
+++ b/arrow-flight/src/sql/client.rs
@@ -150,7 +150,7 @@ impl FlightSqlServiceClient<Channel> {
             .flight_client
             .handshake(req)
             .await
-            .map_err(|e| ArrowError::IoError(format!("Can't handshake {e}")))?;
+            .map_err(|e| ArrowError::IpcError(format!("Can't handshake 
{e}")))?;
         if let Some(auth) = resp.metadata().get("authorization") {
             let auth = auth.to_str().map_err(|_| {
                 ArrowError::ParseError("Can't read auth header".to_string())
@@ -390,16 +390,20 @@ impl FlightSqlServiceClient<Channel> {
     ) -> Result<tonic::Request<T>, ArrowError> {
         for (k, v) in &self.headers {
             let k = AsciiMetadataKey::from_str(k.as_str()).map_err(|e| {
-                ArrowError::IoError(format!("Cannot convert header key 
\"{k}\": {e}"))
+                ArrowError::ParseError(format!("Cannot convert header key 
\"{k}\": {e}"))
             })?;
             let v = v.parse().map_err(|e| {
-                ArrowError::IoError(format!("Cannot convert header value 
\"{v}\": {e}"))
+                ArrowError::ParseError(format!(
+                    "Cannot convert header value \"{v}\": {e}"
+                ))
             })?;
             req.metadata_mut().insert(k, v);
         }
         if let Some(token) = &self.token {
             let val = format!("Bearer {token}").parse().map_err(|e| {
-                ArrowError::IoError(format!("Cannot convert token to header 
value: {e}"))
+                ArrowError::ParseError(format!(
+                    "Cannot convert token to header value: {e}"
+                ))
             })?;
             req.metadata_mut().insert("authorization", val);
         }
@@ -504,11 +508,11 @@ impl PreparedStatement<Channel> {
 }
 
 fn decode_error_to_arrow_error(err: prost::DecodeError) -> ArrowError {
-    ArrowError::IoError(err.to_string())
+    ArrowError::IpcError(err.to_string())
 }
 
 fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
-    ArrowError::IoError(format!("{status:?}"))
+    ArrowError::IpcError(format!("{status:?}"))
 }
 
 // A polymorphic structure to natively represent different types of data 
contained in `FlightData`
diff --git a/arrow-flight/tests/encode_decode.rs 
b/arrow-flight/tests/encode_decode.rs
index 4f1a8e667f..71bcf4e052 100644
--- a/arrow-flight/tests/encode_decode.rs
+++ b/arrow-flight/tests/encode_decode.rs
@@ -386,7 +386,7 @@ async fn test_mismatched_schema_message() {
     do_test(
         make_primitive_batch(5),
         make_dictionary_batch(3),
-        "Error decoding ipc RecordBatch: Io error: Invalid data for schema",
+        "Error decoding ipc RecordBatch: Schema error: Invalid data for 
schema",
     )
     .await;
 
diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index 07f716dea8..3569562af2 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -150,12 +150,12 @@ pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> 
Result<Schema, ArrowErr
         if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
             Ok(schema)
         } else {
-            Err(ArrowError::IoError(
+            Err(ArrowError::ParseError(
                 "Unable to get head as schema".to_string(),
             ))
         }
     } else {
-        Err(ArrowError::IoError(
+        Err(ArrowError::ParseError(
             "Unable to get root as message".to_string(),
         ))
     }
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index b7d328977d..962b17c39d 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -138,12 +138,12 @@ fn create_array(reader: &mut ArrayReader, field: &Field) 
-> Result<ArrayRef, Arr
             let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
 
             let dict_id = field.dict_id().ok_or_else(|| {
-                ArrowError::IoError(format!("Field {field} does not have dict 
id"))
+                ArrowError::ParseError(format!("Field {field} does not have 
dict id"))
             })?;
 
             let value_array =
                 reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
-                    ArrowError::IoError(format!(
+                    ArrowError::ParseError(format!(
                         "Cannot find a dictionary batch with dict id: 
{dict_id}"
                     ))
                 })?;
@@ -193,7 +193,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> 
Result<ArrayRef, Arr
             let null_count = node.null_count();
 
             if length != null_count {
-                return Err(ArrowError::IoError(format!(
+                return Err(ArrowError::SchemaError(format!(
                     "Field {field} of NullArray has unequal null_count 
{null_count} and len {length}"
                 )));
             }
@@ -325,7 +325,7 @@ impl<'a> ArrayReader<'a> {
 
     fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, 
ArrowError> {
         self.nodes.next().ok_or_else(|| {
-            ArrowError::IoError(format!(
+            ArrowError::SchemaError(format!(
                 "Invalid data for schema. {} refers to node not found in 
schema",
                 field
             ))
@@ -402,10 +402,10 @@ pub fn read_record_batch(
     metadata: &MetadataVersion,
 ) -> Result<RecordBatch, ArrowError> {
     let buffers = batch.buffers().ok_or_else(|| {
-        ArrowError::IoError("Unable to get buffers from IPC 
RecordBatch".to_string())
+        ArrowError::IpcError("Unable to get buffers from IPC 
RecordBatch".to_string())
     })?;
     let field_nodes = batch.nodes().ok_or_else(|| {
-        ArrowError::IoError("Unable to get field nodes from IPC 
RecordBatch".to_string())
+        ArrowError::IpcError("Unable to get field nodes from IPC 
RecordBatch".to_string())
     })?;
     let batch_compression = batch.compression();
     let compression = batch_compression
@@ -462,7 +462,7 @@ pub fn read_dictionary(
     metadata: &crate::MetadataVersion,
 ) -> Result<(), ArrowError> {
     if batch.isDelta() {
-        return Err(ArrowError::IoError(
+        return Err(ArrowError::InvalidArgumentError(
             "delta dictionary batches not supported".to_string(),
         ));
     }
@@ -569,14 +569,14 @@ impl<R: Read + Seek> FileReader<R> {
         let mut magic_buffer: [u8; 6] = [0; 6];
         reader.read_exact(&mut magic_buffer)?;
         if magic_buffer != super::ARROW_MAGIC {
-            return Err(ArrowError::IoError(
+            return Err(ArrowError::ParseError(
                 "Arrow file does not contain correct header".to_string(),
             ));
         }
         reader.seek(SeekFrom::End(-6))?;
         reader.read_exact(&mut magic_buffer)?;
         if magic_buffer != super::ARROW_MAGIC {
-            return Err(ArrowError::IoError(
+            return Err(ArrowError::ParseError(
                 "Arrow file does not contain correct footer".to_string(),
             ));
         }
@@ -592,11 +592,11 @@ impl<R: Read + Seek> FileReader<R> {
         reader.read_exact(&mut footer_data)?;
 
         let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
-            ArrowError::IoError(format!("Unable to get root as footer: 
{err:?}"))
+            ArrowError::ParseError(format!("Unable to get root as footer: 
{err:?}"))
         })?;
 
         let blocks = footer.recordBatches().ok_or_else(|| {
-            ArrowError::IoError(
+            ArrowError::ParseError(
                 "Unable to get record batches from IPC Footer".to_string(),
             )
         })?;
@@ -633,7 +633,9 @@ impl<R: Read + Seek> FileReader<R> {
                 reader.read_exact(&mut block_data)?;
 
                 let message = 
crate::root_as_message(&block_data[..]).map_err(|err| {
-                    ArrowError::IoError(format!("Unable to get root as 
message: {err:?}"))
+                    ArrowError::ParseError(format!(
+                        "Unable to get root as message: {err:?}"
+                    ))
                 })?;
 
                 match message.header_type() {
@@ -657,7 +659,7 @@ impl<R: Read + Seek> FileReader<R> {
                         )?;
                     }
                     t => {
-                        return Err(ArrowError::IoError(format!(
+                        return Err(ArrowError::ParseError(format!(
                             "Expecting DictionaryBatch in dictionary blocks, 
found {t:?}."
                         )));
                     }
@@ -705,7 +707,7 @@ impl<R: Read + Seek> FileReader<R> {
     /// Sets the current block to the index, allowing random reads
     pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
         if index >= self.total_blocks {
-            Err(ArrowError::IoError(format!(
+            Err(ArrowError::InvalidArgumentError(format!(
                 "Cannot set batch to index {} from {} total batches",
                 index, self.total_blocks
             )))
@@ -732,25 +734,25 @@ impl<R: Read + Seek> FileReader<R> {
         let mut block_data = vec![0; meta_len as usize];
         self.reader.read_exact(&mut block_data)?;
         let message = crate::root_as_message(&block_data[..]).map_err(|err| {
-            ArrowError::IoError(format!("Unable to get root as footer: 
{err:?}"))
+            ArrowError::ParseError(format!("Unable to get root as footer: 
{err:?}"))
         })?;
 
         // some old test data's footer metadata is not set, so we account for 
that
         if self.metadata_version != crate::MetadataVersion::V1
             && message.version() != self.metadata_version
         {
-            return Err(ArrowError::IoError(
+            return Err(ArrowError::IpcError(
                 "Could not read IPC message as metadata versions 
mismatch".to_string(),
             ));
         }
 
         match message.header_type() {
-            crate::MessageHeader::Schema => Err(ArrowError::IoError(
+            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
                 "Not expecting a schema when messages are read".to_string(),
             )),
             crate::MessageHeader::RecordBatch => {
                 let batch = message.header_as_record_batch().ok_or_else(|| {
-                    ArrowError::IoError(
+                    ArrowError::IpcError(
                         "Unable to read IPC message as record 
batch".to_string(),
                     )
                 })?;
@@ -774,7 +776,7 @@ impl<R: Read + Seek> FileReader<R> {
             crate::MessageHeader::NONE => {
                 Ok(None)
             }
-            t => Err(ArrowError::IoError(format!(
+            t => Err(ArrowError::InvalidArgumentError(format!(
                 "Reading types other than record batches not yet supported, 
unable to read {t:?}"
             ))),
         }
@@ -886,11 +888,11 @@ impl<R: Read> StreamReader<R> {
         reader.read_exact(&mut meta_buffer)?;
 
         let message = 
crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
-            ArrowError::IoError(format!("Unable to get root as message: 
{err:?}"))
+            ArrowError::ParseError(format!("Unable to get root as message: 
{err:?}"))
         })?;
         // message header is a Schema, so read it
         let ipc_schema: crate::Schema = 
message.header_as_schema().ok_or_else(|| {
-            ArrowError::IoError("Unable to read IPC message as 
schema".to_string())
+            ArrowError::ParseError("Unable to read IPC message as 
schema".to_string())
         })?;
         let schema = crate::convert::fb_to_schema(ipc_schema);
 
@@ -965,16 +967,16 @@ impl<R: Read> StreamReader<R> {
 
         let vecs = &meta_buffer.to_vec();
         let message = crate::root_as_message(vecs).map_err(|err| {
-            ArrowError::IoError(format!("Unable to get root as message: 
{err:?}"))
+            ArrowError::ParseError(format!("Unable to get root as message: 
{err:?}"))
         })?;
 
         match message.header_type() {
-            crate::MessageHeader::Schema => Err(ArrowError::IoError(
+            crate::MessageHeader::Schema => Err(ArrowError::IpcError(
                 "Not expecting a schema when messages are read".to_string(),
             )),
             crate::MessageHeader::RecordBatch => {
                 let batch = message.header_as_record_batch().ok_or_else(|| {
-                    ArrowError::IoError(
+                    ArrowError::IpcError(
                         "Unable to read IPC message as record 
batch".to_string(),
                     )
                 })?;
@@ -986,7 +988,7 @@ impl<R: Read> StreamReader<R> {
             }
             crate::MessageHeader::DictionaryBatch => {
                 let batch = message.header_as_dictionary_batch().ok_or_else(|| 
{
-                    ArrowError::IoError(
+                    ArrowError::IpcError(
                         "Unable to read IPC message as dictionary 
batch".to_string(),
                     )
                 })?;
@@ -1004,7 +1006,7 @@ impl<R: Read> StreamReader<R> {
             crate::MessageHeader::NONE => {
                 Ok(None)
             }
-            t => Err(ArrowError::IoError(
+            t => Err(ArrowError::InvalidArgumentError(
                 format!("Reading types other than record batches not yet 
supported, unable to read {t:?} ")
             )),
         }
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 1c56613d8f..9c418d76e4 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -757,7 +757,7 @@ impl<W: Write> FileWriter<W> {
     /// Write a record batch to the file
     pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
         if self.finished {
-            return Err(ArrowError::IoError(
+            return Err(ArrowError::IpcError(
                 "Cannot write record batch to file writer as it is 
closed".to_string(),
             ));
         }
@@ -794,7 +794,7 @@ impl<W: Write> FileWriter<W> {
     /// Write footer and closing tag, then mark the writer as done
     pub fn finish(&mut self) -> Result<(), ArrowError> {
         if self.finished {
-            return Err(ArrowError::IoError(
+            return Err(ArrowError::IpcError(
                 "Cannot write footer to file writer as it is 
closed".to_string(),
             ));
         }
@@ -909,7 +909,7 @@ impl<W: Write> StreamWriter<W> {
     /// Write a record batch to the stream
     pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
         if self.finished {
-            return Err(ArrowError::IoError(
+            return Err(ArrowError::IpcError(
                 "Cannot write record batch to stream writer as it is 
closed".to_string(),
             ));
         }
@@ -930,7 +930,7 @@ impl<W: Write> StreamWriter<W> {
     /// Write continuation bytes, and mark the stream as done
     pub fn finish(&mut self) -> Result<(), ArrowError> {
         if self.finished {
-            return Err(ArrowError::IoError(
+            return Err(ArrowError::IpcError(
                 "Cannot write footer to stream writer as it is 
closed".to_string(),
             ));
         }
diff --git a/arrow-schema/src/error.rs b/arrow-schema/src/error.rs
index cd236c0871..8ea533db89 100644
--- a/arrow-schema/src/error.rs
+++ b/arrow-schema/src/error.rs
@@ -35,7 +35,8 @@ pub enum ArrowError {
     DivideByZero,
     CsvError(String),
     JsonError(String),
-    IoError(String),
+    IoError(String, std::io::Error),
+    IpcError(String),
     InvalidArgumentError(String),
     ParquetError(String),
     /// Error during import or export to/from the C Data Interface
@@ -53,7 +54,7 @@ impl ArrowError {
 
 impl From<std::io::Error> for ArrowError {
     fn from(error: std::io::Error) -> Self {
-        ArrowError::IoError(error.to_string())
+        ArrowError::IoError(error.to_string(), error)
     }
 }
 
@@ -65,7 +66,7 @@ impl From<std::string::FromUtf8Error> for ArrowError {
 
 impl<W: Write> From<std::io::IntoInnerError<W>> for ArrowError {
     fn from(error: std::io::IntoInnerError<W>) -> Self {
-        ArrowError::IoError(error.to_string())
+        ArrowError::IoError(error.to_string(), error.into())
     }
 }
 
@@ -84,7 +85,8 @@ impl Display for ArrowError {
             ArrowError::DivideByZero => write!(f, "Divide by zero error"),
             ArrowError::CsvError(desc) => write!(f, "Csv error: {desc}"),
             ArrowError::JsonError(desc) => write!(f, "Json error: {desc}"),
-            ArrowError::IoError(desc) => write!(f, "Io error: {desc}"),
+            ArrowError::IoError(desc, _) => write!(f, "Io error: {desc}"),
+            ArrowError::IpcError(desc) => write!(f, "Ipc error: {desc}"),
             ArrowError::InvalidArgumentError(desc) => {
                 write!(f, "Invalid argument error: {desc}")
             }
diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs
index a9d2e8ab6b..7005cadc62 100644
--- a/arrow/src/ffi_stream.rs
+++ b/arrow/src/ffi_stream.rs
@@ -258,7 +258,7 @@ fn get_error_code(err: &ArrowError) -> i32 {
     match err {
         ArrowError::NotYetImplemented(_) => ENOSYS,
         ArrowError::MemoryError(_) => ENOMEM,
-        ArrowError::IoError(_) => EIO,
+        ArrowError::IoError(_, _) => EIO,
         _ => EINVAL,
     }
 }

Reply via email to