This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d9381c66c2 Add `IpcError` variant to replace some uses of
`IoError`that don't have underlying `std::io::Error` (#4726)
d9381c66c2 is described below
commit d9381c66c25f52bb8d6fbd9503947c804a89a37a
Author: Alexandre Crayssac <[email protected]>
AuthorDate: Thu Aug 24 14:06:38 2023 +0200
Add `IpcError` variant to replace some uses of `IoError`that don't have
underlying `std::io::Error` (#4726)
---
arrow-flight/examples/flight_sql_server.rs | 2 +-
arrow-flight/src/bin/flight_sql_client.rs | 10 +++---
arrow-flight/src/sql/client.rs | 16 +++++----
arrow-flight/tests/encode_decode.rs | 2 +-
arrow-ipc/src/convert.rs | 4 +--
arrow-ipc/src/reader.rs | 54 ++++++++++++++++--------------
arrow-ipc/src/writer.rs | 8 ++---
arrow-schema/src/error.rs | 10 +++---
arrow/src/ffi_stream.rs | 2 +-
9 files changed, 58 insertions(+), 50 deletions(-)
diff --git a/arrow-flight/examples/flight_sql_server.rs
b/arrow-flight/examples/flight_sql_server.rs
index 08a36bc49e..1e99957390 100644
--- a/arrow-flight/examples/flight_sql_server.rs
+++ b/arrow-flight/examples/flight_sql_server.rs
@@ -802,7 +802,7 @@ mod tests {
fn endpoint(uri: String) -> Result<Endpoint, ArrowError> {
let endpoint = Endpoint::new(uri)
- .map_err(|_| ArrowError::IoError("Cannot create
endpoint".to_string()))?
+ .map_err(|_| ArrowError::IpcError("Cannot create
endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't
want packets to wait
diff --git a/arrow-flight/src/bin/flight_sql_client.rs
b/arrow-flight/src/bin/flight_sql_client.rs
index e5aacc2e77..20c8062f89 100644
--- a/arrow-flight/src/bin/flight_sql_client.rs
+++ b/arrow-flight/src/bin/flight_sql_client.rs
@@ -151,7 +151,7 @@ async fn setup_client(
let protocol = if args.tls { "https" } else { "http" };
let mut endpoint = Endpoint::new(format!("{}://{}:{}", protocol,
args.host, port))
- .map_err(|_| ArrowError::IoError("Cannot create
endpoint".to_string()))?
+ .map_err(|_| ArrowError::IpcError("Cannot create
endpoint".to_string()))?
.connect_timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20))
.tcp_nodelay(true) // Disable Nagle's Algorithm since we don't want
packets to wait
@@ -162,15 +162,15 @@ async fn setup_client(
if args.tls {
let tls_config = ClientTlsConfig::new();
- endpoint = endpoint
- .tls_config(tls_config)
- .map_err(|_| ArrowError::IoError("Cannot create TLS
endpoint".to_string()))?;
+ endpoint = endpoint.tls_config(tls_config).map_err(|_| {
+ ArrowError::IpcError("Cannot create TLS endpoint".to_string())
+ })?;
}
let channel = endpoint
.connect()
.await
- .map_err(|e| ArrowError::IoError(format!("Cannot connect to endpoint:
{e}")))?;
+ .map_err(|e| ArrowError::IpcError(format!("Cannot connect to endpoint:
{e}")))?;
let mut client = FlightSqlServiceClient::new(channel);
info!("connected");
diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs
index d661c96409..4b1f38ebcb 100644
--- a/arrow-flight/src/sql/client.rs
+++ b/arrow-flight/src/sql/client.rs
@@ -150,7 +150,7 @@ impl FlightSqlServiceClient<Channel> {
.flight_client
.handshake(req)
.await
- .map_err(|e| ArrowError::IoError(format!("Can't handshake {e}")))?;
+ .map_err(|e| ArrowError::IpcError(format!("Can't handshake
{e}")))?;
if let Some(auth) = resp.metadata().get("authorization") {
let auth = auth.to_str().map_err(|_| {
ArrowError::ParseError("Can't read auth header".to_string())
@@ -390,16 +390,20 @@ impl FlightSqlServiceClient<Channel> {
) -> Result<tonic::Request<T>, ArrowError> {
for (k, v) in &self.headers {
let k = AsciiMetadataKey::from_str(k.as_str()).map_err(|e| {
- ArrowError::IoError(format!("Cannot convert header key
\"{k}\": {e}"))
+ ArrowError::ParseError(format!("Cannot convert header key
\"{k}\": {e}"))
})?;
let v = v.parse().map_err(|e| {
- ArrowError::IoError(format!("Cannot convert header value
\"{v}\": {e}"))
+ ArrowError::ParseError(format!(
+ "Cannot convert header value \"{v}\": {e}"
+ ))
})?;
req.metadata_mut().insert(k, v);
}
if let Some(token) = &self.token {
let val = format!("Bearer {token}").parse().map_err(|e| {
- ArrowError::IoError(format!("Cannot convert token to header
value: {e}"))
+ ArrowError::ParseError(format!(
+ "Cannot convert token to header value: {e}"
+ ))
})?;
req.metadata_mut().insert("authorization", val);
}
@@ -504,11 +508,11 @@ impl PreparedStatement<Channel> {
}
fn decode_error_to_arrow_error(err: prost::DecodeError) -> ArrowError {
- ArrowError::IoError(err.to_string())
+ ArrowError::IpcError(err.to_string())
}
fn status_to_arrow_error(status: tonic::Status) -> ArrowError {
- ArrowError::IoError(format!("{status:?}"))
+ ArrowError::IpcError(format!("{status:?}"))
}
// A polymorphic structure to natively represent different types of data
contained in `FlightData`
diff --git a/arrow-flight/tests/encode_decode.rs
b/arrow-flight/tests/encode_decode.rs
index 4f1a8e667f..71bcf4e052 100644
--- a/arrow-flight/tests/encode_decode.rs
+++ b/arrow-flight/tests/encode_decode.rs
@@ -386,7 +386,7 @@ async fn test_mismatched_schema_message() {
do_test(
make_primitive_batch(5),
make_dictionary_batch(3),
- "Error decoding ipc RecordBatch: Io error: Invalid data for schema",
+ "Error decoding ipc RecordBatch: Schema error: Invalid data for
schema",
)
.await;
diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index 07f716dea8..3569562af2 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -150,12 +150,12 @@ pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) ->
Result<Schema, ArrowErr
if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
Ok(schema)
} else {
- Err(ArrowError::IoError(
+ Err(ArrowError::ParseError(
"Unable to get head as schema".to_string(),
))
}
} else {
- Err(ArrowError::IoError(
+ Err(ArrowError::ParseError(
"Unable to get root as message".to_string(),
))
}
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index b7d328977d..962b17c39d 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -138,12 +138,12 @@ fn create_array(reader: &mut ArrayReader, field: &Field)
-> Result<ArrayRef, Arr
let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let dict_id = field.dict_id().ok_or_else(|| {
- ArrowError::IoError(format!("Field {field} does not have dict
id"))
+ ArrowError::ParseError(format!("Field {field} does not have
dict id"))
})?;
let value_array =
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
- ArrowError::IoError(format!(
+ ArrowError::ParseError(format!(
"Cannot find a dictionary batch with dict id:
{dict_id}"
))
})?;
@@ -193,7 +193,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) ->
Result<ArrayRef, Arr
let null_count = node.null_count();
if length != null_count {
- return Err(ArrowError::IoError(format!(
+ return Err(ArrowError::SchemaError(format!(
"Field {field} of NullArray has unequal null_count
{null_count} and len {length}"
)));
}
@@ -325,7 +325,7 @@ impl<'a> ArrayReader<'a> {
fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode,
ArrowError> {
self.nodes.next().ok_or_else(|| {
- ArrowError::IoError(format!(
+ ArrowError::SchemaError(format!(
"Invalid data for schema. {} refers to node not found in
schema",
field
))
@@ -402,10 +402,10 @@ pub fn read_record_batch(
metadata: &MetadataVersion,
) -> Result<RecordBatch, ArrowError> {
let buffers = batch.buffers().ok_or_else(|| {
- ArrowError::IoError("Unable to get buffers from IPC
RecordBatch".to_string())
+ ArrowError::IpcError("Unable to get buffers from IPC
RecordBatch".to_string())
})?;
let field_nodes = batch.nodes().ok_or_else(|| {
- ArrowError::IoError("Unable to get field nodes from IPC
RecordBatch".to_string())
+ ArrowError::IpcError("Unable to get field nodes from IPC
RecordBatch".to_string())
})?;
let batch_compression = batch.compression();
let compression = batch_compression
@@ -462,7 +462,7 @@ pub fn read_dictionary(
metadata: &crate::MetadataVersion,
) -> Result<(), ArrowError> {
if batch.isDelta() {
- return Err(ArrowError::IoError(
+ return Err(ArrowError::InvalidArgumentError(
"delta dictionary batches not supported".to_string(),
));
}
@@ -569,14 +569,14 @@ impl<R: Read + Seek> FileReader<R> {
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
- return Err(ArrowError::IoError(
+ return Err(ArrowError::ParseError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
- return Err(ArrowError::IoError(
+ return Err(ArrowError::ParseError(
"Arrow file does not contain correct footer".to_string(),
));
}
@@ -592,11 +592,11 @@ impl<R: Read + Seek> FileReader<R> {
reader.read_exact(&mut footer_data)?;
let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
- ArrowError::IoError(format!("Unable to get root as footer:
{err:?}"))
+ ArrowError::ParseError(format!("Unable to get root as footer:
{err:?}"))
})?;
let blocks = footer.recordBatches().ok_or_else(|| {
- ArrowError::IoError(
+ ArrowError::ParseError(
"Unable to get record batches from IPC Footer".to_string(),
)
})?;
@@ -633,7 +633,9 @@ impl<R: Read + Seek> FileReader<R> {
reader.read_exact(&mut block_data)?;
let message =
crate::root_as_message(&block_data[..]).map_err(|err| {
- ArrowError::IoError(format!("Unable to get root as
message: {err:?}"))
+ ArrowError::ParseError(format!(
+ "Unable to get root as message: {err:?}"
+ ))
})?;
match message.header_type() {
@@ -657,7 +659,7 @@ impl<R: Read + Seek> FileReader<R> {
)?;
}
t => {
- return Err(ArrowError::IoError(format!(
+ return Err(ArrowError::ParseError(format!(
"Expecting DictionaryBatch in dictionary blocks,
found {t:?}."
)));
}
@@ -705,7 +707,7 @@ impl<R: Read + Seek> FileReader<R> {
/// Sets the current block to the index, allowing random reads
pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
if index >= self.total_blocks {
- Err(ArrowError::IoError(format!(
+ Err(ArrowError::InvalidArgumentError(format!(
"Cannot set batch to index {} from {} total batches",
index, self.total_blocks
)))
@@ -732,25 +734,25 @@ impl<R: Read + Seek> FileReader<R> {
let mut block_data = vec![0; meta_len as usize];
self.reader.read_exact(&mut block_data)?;
let message = crate::root_as_message(&block_data[..]).map_err(|err| {
- ArrowError::IoError(format!("Unable to get root as footer:
{err:?}"))
+ ArrowError::ParseError(format!("Unable to get root as footer:
{err:?}"))
})?;
// some old test data's footer metadata is not set, so we account for
that
if self.metadata_version != crate::MetadataVersion::V1
&& message.version() != self.metadata_version
{
- return Err(ArrowError::IoError(
+ return Err(ArrowError::IpcError(
"Could not read IPC message as metadata versions
mismatch".to_string(),
));
}
match message.header_type() {
- crate::MessageHeader::Schema => Err(ArrowError::IoError(
+ crate::MessageHeader::Schema => Err(ArrowError::IpcError(
"Not expecting a schema when messages are read".to_string(),
)),
crate::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
- ArrowError::IoError(
+ ArrowError::IpcError(
"Unable to read IPC message as record
batch".to_string(),
)
})?;
@@ -774,7 +776,7 @@ impl<R: Read + Seek> FileReader<R> {
crate::MessageHeader::NONE => {
Ok(None)
}
- t => Err(ArrowError::IoError(format!(
+ t => Err(ArrowError::InvalidArgumentError(format!(
"Reading types other than record batches not yet supported,
unable to read {t:?}"
))),
}
@@ -886,11 +888,11 @@ impl<R: Read> StreamReader<R> {
reader.read_exact(&mut meta_buffer)?;
let message =
crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
- ArrowError::IoError(format!("Unable to get root as message:
{err:?}"))
+ ArrowError::ParseError(format!("Unable to get root as message:
{err:?}"))
})?;
// message header is a Schema, so read it
let ipc_schema: crate::Schema =
message.header_as_schema().ok_or_else(|| {
- ArrowError::IoError("Unable to read IPC message as
schema".to_string())
+ ArrowError::ParseError("Unable to read IPC message as
schema".to_string())
})?;
let schema = crate::convert::fb_to_schema(ipc_schema);
@@ -965,16 +967,16 @@ impl<R: Read> StreamReader<R> {
let vecs = &meta_buffer.to_vec();
let message = crate::root_as_message(vecs).map_err(|err| {
- ArrowError::IoError(format!("Unable to get root as message:
{err:?}"))
+ ArrowError::ParseError(format!("Unable to get root as message:
{err:?}"))
})?;
match message.header_type() {
- crate::MessageHeader::Schema => Err(ArrowError::IoError(
+ crate::MessageHeader::Schema => Err(ArrowError::IpcError(
"Not expecting a schema when messages are read".to_string(),
)),
crate::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
- ArrowError::IoError(
+ ArrowError::IpcError(
"Unable to read IPC message as record
batch".to_string(),
)
})?;
@@ -986,7 +988,7 @@ impl<R: Read> StreamReader<R> {
}
crate::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(||
{
- ArrowError::IoError(
+ ArrowError::IpcError(
"Unable to read IPC message as dictionary
batch".to_string(),
)
})?;
@@ -1004,7 +1006,7 @@ impl<R: Read> StreamReader<R> {
crate::MessageHeader::NONE => {
Ok(None)
}
- t => Err(ArrowError::IoError(
+ t => Err(ArrowError::InvalidArgumentError(
format!("Reading types other than record batches not yet
supported, unable to read {t:?} ")
)),
}
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 1c56613d8f..9c418d76e4 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -757,7 +757,7 @@ impl<W: Write> FileWriter<W> {
/// Write a record batch to the file
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
- return Err(ArrowError::IoError(
+ return Err(ArrowError::IpcError(
"Cannot write record batch to file writer as it is
closed".to_string(),
));
}
@@ -794,7 +794,7 @@ impl<W: Write> FileWriter<W> {
/// Write footer and closing tag, then mark the writer as done
pub fn finish(&mut self) -> Result<(), ArrowError> {
if self.finished {
- return Err(ArrowError::IoError(
+ return Err(ArrowError::IpcError(
"Cannot write footer to file writer as it is
closed".to_string(),
));
}
@@ -909,7 +909,7 @@ impl<W: Write> StreamWriter<W> {
/// Write a record batch to the stream
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
- return Err(ArrowError::IoError(
+ return Err(ArrowError::IpcError(
"Cannot write record batch to stream writer as it is
closed".to_string(),
));
}
@@ -930,7 +930,7 @@ impl<W: Write> StreamWriter<W> {
/// Write continuation bytes, and mark the stream as done
pub fn finish(&mut self) -> Result<(), ArrowError> {
if self.finished {
- return Err(ArrowError::IoError(
+ return Err(ArrowError::IpcError(
"Cannot write footer to stream writer as it is
closed".to_string(),
));
}
diff --git a/arrow-schema/src/error.rs b/arrow-schema/src/error.rs
index cd236c0871..8ea533db89 100644
--- a/arrow-schema/src/error.rs
+++ b/arrow-schema/src/error.rs
@@ -35,7 +35,8 @@ pub enum ArrowError {
DivideByZero,
CsvError(String),
JsonError(String),
- IoError(String),
+ IoError(String, std::io::Error),
+ IpcError(String),
InvalidArgumentError(String),
ParquetError(String),
/// Error during import or export to/from the C Data Interface
@@ -53,7 +54,7 @@ impl ArrowError {
impl From<std::io::Error> for ArrowError {
fn from(error: std::io::Error) -> Self {
- ArrowError::IoError(error.to_string())
+ ArrowError::IoError(error.to_string(), error)
}
}
@@ -65,7 +66,7 @@ impl From<std::string::FromUtf8Error> for ArrowError {
impl<W: Write> From<std::io::IntoInnerError<W>> for ArrowError {
fn from(error: std::io::IntoInnerError<W>) -> Self {
- ArrowError::IoError(error.to_string())
+ ArrowError::IoError(error.to_string(), error.into())
}
}
@@ -84,7 +85,8 @@ impl Display for ArrowError {
ArrowError::DivideByZero => write!(f, "Divide by zero error"),
ArrowError::CsvError(desc) => write!(f, "Csv error: {desc}"),
ArrowError::JsonError(desc) => write!(f, "Json error: {desc}"),
- ArrowError::IoError(desc) => write!(f, "Io error: {desc}"),
+ ArrowError::IoError(desc, _) => write!(f, "Io error: {desc}"),
+ ArrowError::IpcError(desc) => write!(f, "Ipc error: {desc}"),
ArrowError::InvalidArgumentError(desc) => {
write!(f, "Invalid argument error: {desc}")
}
diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs
index a9d2e8ab6b..7005cadc62 100644
--- a/arrow/src/ffi_stream.rs
+++ b/arrow/src/ffi_stream.rs
@@ -258,7 +258,7 @@ fn get_error_code(err: &ArrowError) -> i32 {
match err {
ArrowError::NotYetImplemented(_) => ENOSYS,
ArrowError::MemoryError(_) => ENOMEM,
- ArrowError::IoError(_) => EIO,
+ ArrowError::IoError(_, _) => EIO,
_ => EINVAL,
}
}