This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1c9583ab95 Avoid unecessary copy when reading arrow files (#11840)
1c9583ab95 is described below
commit 1c9583ab95310fb1afa93fec88432ed6536da749
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Fri Aug 9 04:39:44 2024 +0800
Avoid unecessary copy when reading arrow files (#11840)
* avoid copy
* fmt
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/physical_plan/arrow_file.rs | 11 +++++++++--
datafusion/proto-common/src/from_proto/mod.rs | 4 ++--
2 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index a1ee6fbe13..b4edc221c1 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -31,6 +31,7 @@ use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
};
+use arrow::buffer::Buffer;
use arrow_ipc::reader::FileDecoder;
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
@@ -296,7 +297,10 @@ impl FileOpener for ArrowOpener {
for (dict_block, dict_result) in
footer.dictionaries().iter().flatten().zip(dict_results)
{
- decoder.read_dictionary(dict_block,
&dict_result.into())?;
+ decoder.read_dictionary(
+ dict_block,
+ &Buffer::from_bytes(dict_result.into()),
+ )?;
}
// filter recordbatches according to range
@@ -332,7 +336,10 @@ impl FileOpener for ArrowOpener {
.zip(recordbatch_results)
.filter_map(move |(block, data)| {
decoder
- .read_record_batch(&block, &data.into())
+ .read_record_batch(
+ &block,
+ &Buffer::from_bytes(data.into()),
+ )
.transpose()
}),
)
diff --git a/datafusion/proto-common/src/from_proto/mod.rs
b/datafusion/proto-common/src/from_proto/mod.rs
index 3487f43ae2..feb4c11aa8 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -408,7 +408,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
"Error IPC message while deserializing
ScalarValue::List: {e}"
))
})?;
- let buffer = Buffer::from(arrow_data);
+ let buffer = Buffer::from(arrow_data.as_slice());
let ipc_batch = message.header_as_record_batch().ok_or_else(||
{
Error::General(
@@ -423,7 +423,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
"Error IPC message while deserializing
ScalarValue::List dictionary message: {e}"
))
})?;
- let buffer = Buffer::from(arrow_data);
+ let buffer = Buffer::from(arrow_data.as_slice());
let dict_batch =
message.header_as_dictionary_batch().ok_or_else(|| {
Error::General(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]