This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d5704f75fc Support Writing Arrow files (#8608)
d5704f75fc is described below
commit d5704f75fc28f88632518ef9a808c9cda38dc162
Author: Devin D'Angelo <[email protected]>
AuthorDate: Sun Dec 24 07:46:26 2023 -0500
Support Writing Arrow files (#8608)
* write arrow files
* update datafusion-cli lock
* fix toml formatting
* Update insert_to_external.slt
Co-authored-by: Andrew Lamb <[email protected]>
* add ticket tracking arrow options
* default to lz4 compression
* update datafusion-cli lock
* cargo update
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
Cargo.toml | 28 +--
datafusion-cli/Cargo.lock | 56 +++---
datafusion/core/Cargo.toml | 1 +
.../core/src/datasource/file_format/arrow.rs | 207 ++++++++++++++++++++-
.../core/src/datasource/file_format/parquet.rs | 34 +---
.../core/src/datasource/file_format/write/mod.rs | 33 +++-
datafusion/sqllogictest/test_files/copy.slt | 56 ++++++
.../sqllogictest/test_files/insert_to_external.slt | 39 ++++
8 files changed, 368 insertions(+), 86 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 023dc6c6fc..a698fbf471 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,24 +17,7 @@
[workspace]
exclude = ["datafusion-cli"]
-members = [
- "datafusion/common",
- "datafusion/core",
- "datafusion/expr",
- "datafusion/execution",
- "datafusion/optimizer",
- "datafusion/physical-expr",
- "datafusion/physical-plan",
- "datafusion/proto",
- "datafusion/proto/gen",
- "datafusion/sql",
- "datafusion/sqllogictest",
- "datafusion/substrait",
- "datafusion/wasmtest",
- "datafusion-examples",
- "docs",
- "test-utils",
- "benchmarks",
+members = ["datafusion/common", "datafusion/core", "datafusion/expr",
"datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr",
"datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen",
"datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait",
"datafusion/wasmtest", "datafusion-examples", "docs", "test-utils",
"benchmarks",
]
resolver = "2"
@@ -53,24 +36,26 @@ arrow = { version = "49.0.0", features = ["prettyprint"] }
arrow-array = { version = "49.0.0", default-features = false, features =
["chrono-tz"] }
arrow-buffer = { version = "49.0.0", default-features = false }
arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
+arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] }
arrow-ord = { version = "49.0.0", default-features = false }
arrow-schema = { version = "49.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.1"
bytes = "1.4"
+chrono = { version = "0.4.31", default-features = false }
ctor = "0.2.0"
+dashmap = "5.4.0"
datafusion = { path = "datafusion/core", version = "34.0.0" }
datafusion-common = { path = "datafusion/common", version = "34.0.0" }
+datafusion-execution = { path = "datafusion/execution", version = "34.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "34.0.0" }
-datafusion-sql = { path = "datafusion/sql", version = "34.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "34.0.0" }
datafusion-physical-expr = { path = "datafusion/physical-expr", version =
"34.0.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version =
"34.0.0" }
-datafusion-execution = { path = "datafusion/execution", version = "34.0.0" }
datafusion-proto = { path = "datafusion/proto", version = "34.0.0" }
+datafusion-sql = { path = "datafusion/sql", version = "34.0.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version =
"34.0.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "34.0.0" }
-dashmap = "5.4.0"
doc-comment = "0.3"
env_logger = "0.10"
futures = "0.3"
@@ -88,7 +73,6 @@ serde_json = "1"
sqlparser = { version = "0.40.0", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
-chrono = { version = "0.4.31", default-features = false }
url = "2.2"
[profile.release]
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index ac05ddf10a..9f75013c86 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -255,6 +255,7 @@ dependencies = [
"arrow-data",
"arrow-schema",
"flatbuffers",
+ "lz4_flex",
]
[[package]]
@@ -378,13 +379,13 @@ dependencies = [
[[package]]
name = "async-trait"
-version = "0.1.74"
+version = "0.1.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9"
+checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -1074,7 +1075,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e"
dependencies = [
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -1104,6 +1105,7 @@ dependencies = [
"apache-avro",
"arrow",
"arrow-array",
+ "arrow-ipc",
"arrow-schema",
"async-compression",
"async-trait",
@@ -1576,7 +1578,7 @@ checksum =
"53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -2496,7 +2498,7 @@ checksum =
"4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -2513,9 +2515,9 @@ checksum =
"8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
-version = "0.3.27"
+version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
+checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a"
[[package]]
name = "powerfmt"
@@ -2586,9 +2588,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.70"
+version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b"
+checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8"
dependencies = [
"unicode-ident",
]
@@ -3020,7 +3022,7 @@ checksum =
"43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -3186,7 +3188,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -3208,9 +3210,9 @@ dependencies = [
[[package]]
name = "syn"
-version = "2.0.41"
+version = "2.0.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "44c8b28c477cc3bf0e7966561e3460130e1255f7a1cf71931075f1c5e7a7e269"
+checksum = "5b7d0a2c048d661a1a59fcd7355baa232f7ed34e0ee4df2eef3c1c1c0d3852d8"
dependencies = [
"proc-macro2",
"quote",
@@ -3289,7 +3291,7 @@ checksum =
"01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -3357,9 +3359,9 @@ checksum =
"1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.35.0"
+version = "1.35.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c"
+checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104"
dependencies = [
"backtrace",
"bytes",
@@ -3381,7 +3383,7 @@ checksum =
"5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -3478,7 +3480,7 @@ checksum =
"34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -3523,7 +3525,7 @@ checksum =
"f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
@@ -3677,7 +3679,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
"wasm-bindgen-shared",
]
@@ -3711,7 +3713,7 @@ checksum =
"f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -3960,22 +3962,22 @@ dependencies = [
[[package]]
name = "zerocopy"
-version = "0.7.31"
+version = "0.7.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d"
+checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
-version = "0.7.31"
+version = "0.7.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a"
+checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.41",
+ "syn 2.0.42",
]
[[package]]
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 0ee83e7567..9de6a7f7d6 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -55,6 +55,7 @@ ahash = { version = "0.8", default-features = false, features
= ["runtime-rng"]
apache-avro = { version = "0.16", optional = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
+arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
async-compression = { version = "0.4.0", features = ["bzip2", "gzip", "xz",
"zstd", "futures-io", "tokio"], optional = true }
async-trait = { workspace = true }
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs
b/datafusion/core/src/datasource/file_format/arrow.rs
index 07c96bdae1..7d393d9129 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -21,10 +21,13 @@
use std::any::Any;
use std::borrow::Cow;
+use std::fmt::{self, Debug};
use std::sync::Arc;
use crate::datasource::file_format::FileFormat;
-use crate::datasource::physical_plan::{ArrowExec, FileScanConfig};
+use crate::datasource::physical_plan::{
+ ArrowExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
+};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;
@@ -32,16 +35,28 @@ use crate::physical_plan::ExecutionPlan;
use arrow::ipc::convert::fb_to_schema;
use arrow::ipc::reader::FileReader;
use arrow::ipc::root_as_message;
+use arrow_ipc::writer::IpcWriteOptions;
+use arrow_ipc::CompressionType;
use arrow_schema::{ArrowError, Schema, SchemaRef};
use bytes::Bytes;
-use datafusion_common::{FileType, Statistics};
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics};
+use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
+use crate::physical_plan::{DisplayAs, DisplayFormatType};
use async_trait::async_trait;
+use datafusion_physical_plan::insert::{DataSink, FileSinkExec};
+use datafusion_physical_plan::metrics::MetricsSet;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
+use tokio::io::AsyncWriteExt;
+use tokio::task::JoinSet;
+
+use super::file_compression_type::FileCompressionType;
+use super::write::demux::start_demuxer_task;
+use super::write::{create_writer, SharedBuffer};
/// Arrow `FileFormat` implementation.
#[derive(Default, Debug)]
@@ -97,11 +112,197 @@ impl FileFormat for ArrowFormat {
Ok(Arc::new(exec))
}
+ async fn create_writer_physical_plan(
+ &self,
+ input: Arc<dyn ExecutionPlan>,
+ _state: &SessionState,
+ conf: FileSinkConfig,
+ order_requirements: Option<Vec<PhysicalSortRequirement>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if conf.overwrite {
+ return not_impl_err!("Overwrites are not implemented yet for Arrow
format");
+ }
+
+ let sink_schema = conf.output_schema().clone();
+ let sink = Arc::new(ArrowFileSink::new(conf));
+
+ Ok(Arc::new(FileSinkExec::new(
+ input,
+ sink,
+ sink_schema,
+ order_requirements,
+ )) as _)
+ }
+
fn file_type(&self) -> FileType {
FileType::ARROW
}
}
+/// Implements [`DataSink`] for writing to arrow_ipc files
+struct ArrowFileSink {
+ config: FileSinkConfig,
+}
+
+impl ArrowFileSink {
+ fn new(config: FileSinkConfig) -> Self {
+ Self { config }
+ }
+
+ /// Converts table schema to writer schema, which may differ in the case
+ /// of hive style partitioning where some columns are removed from the
+ /// underlying files.
+ fn get_writer_schema(&self) -> Arc<Schema> {
+ if !self.config.table_partition_cols.is_empty() {
+ let schema = self.config.output_schema();
+ let partition_names: Vec<_> = self
+ .config
+ .table_partition_cols
+ .iter()
+ .map(|(s, _)| s)
+ .collect();
+ Arc::new(Schema::new(
+ schema
+ .fields()
+ .iter()
+ .filter(|f| !partition_names.contains(&f.name()))
+ .map(|f| (**f).clone())
+ .collect::<Vec<_>>(),
+ ))
+ } else {
+ self.config.output_schema().clone()
+ }
+ }
+}
+
+impl Debug for ArrowFileSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ArrowFileSink").finish()
+ }
+}
+
+impl DisplayAs for ArrowFileSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "ArrowFileSink(file_groups=",)?;
+ FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ write!(f, ")")
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl DataSink for ArrowFileSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ None
+ }
+
+ async fn write_all(
+ &self,
+ data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ // No props are supported yet, but can be by updating
FileTypeWriterOptions
+ // to populate this struct and use those options to initialize the
arrow_ipc::writer::FileWriter
+ // https://github.com/apache/arrow-datafusion/issues/8635
+ let _arrow_props =
self.config.file_type_writer_options.try_into_arrow()?;
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+
+ let part_col = if !self.config.table_partition_cols.is_empty() {
+ Some(self.config.table_partition_cols.clone())
+ } else {
+ None
+ };
+
+ let (demux_task, mut file_stream_rx) = start_demuxer_task(
+ data,
+ context,
+ part_col,
+ self.config.table_paths[0].clone(),
+ "arrow".into(),
+ self.config.single_file_output,
+ );
+
+ let mut file_write_tasks: JoinSet<std::result::Result<usize,
DataFusionError>> =
+ JoinSet::new();
+
+ let ipc_options =
+ IpcWriteOptions::try_new(64, false,
arrow_ipc::MetadataVersion::V5)?
+ .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
+ while let Some((path, mut rx)) = file_stream_rx.recv().await {
+ let shared_buffer = SharedBuffer::new(1048576);
+ let mut arrow_writer =
arrow_ipc::writer::FileWriter::try_new_with_options(
+ shared_buffer.clone(),
+ &self.get_writer_schema(),
+ ipc_options.clone(),
+ )?;
+ let mut object_store_writer = create_writer(
+ FileCompressionType::UNCOMPRESSED,
+ &path,
+ object_store.clone(),
+ )
+ .await?;
+ file_write_tasks.spawn(async move {
+ let mut row_count = 0;
+ while let Some(batch) = rx.recv().await {
+ row_count += batch.num_rows();
+ arrow_writer.write(&batch)?;
+ let mut buff_to_flush =
shared_buffer.buffer.try_lock().unwrap();
+ if buff_to_flush.len() > 1024000 {
+ object_store_writer
+ .write_all(buff_to_flush.as_slice())
+ .await?;
+ buff_to_flush.clear();
+ }
+ }
+ arrow_writer.finish()?;
+ let final_buff = shared_buffer.buffer.try_lock().unwrap();
+
+ object_store_writer.write_all(final_buff.as_slice()).await?;
+ object_store_writer.shutdown().await?;
+ Ok(row_count)
+ });
+ }
+
+ let mut row_count = 0;
+ while let Some(result) = file_write_tasks.join_next().await {
+ match result {
+ Ok(r) => {
+ row_count += r?;
+ }
+ Err(e) => {
+ if e.is_panic() {
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ unreachable!();
+ }
+ }
+ }
+ }
+
+ match demux_task.await {
+ Ok(r) => r?,
+ Err(e) => {
+ if e.is_panic() {
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ unreachable!();
+ }
+ }
+ }
+ Ok(row_count as u64)
+ }
+}
+
const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 9db320fb9d..0c813b6ccb 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -29,7 +29,6 @@ use parquet::file::writer::SerializedFileWriter;
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
-use std::io::Write;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
@@ -56,7 +55,7 @@ use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use super::write::demux::start_demuxer_task;
-use super::write::{create_writer, AbortableWrite};
+use super::write::{create_writer, AbortableWrite, SharedBuffer};
use super::{FileFormat, FileScanConfig};
use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
@@ -1101,37 +1100,6 @@ async fn output_single_parquet_file_parallelized(
Ok(row_count)
}
-/// A buffer with interior mutability shared by the SerializedFileWriter and
-/// ObjectStore writer
-#[derive(Clone)]
-struct SharedBuffer {
- /// The inner buffer for reading and writing
- ///
- /// The lock is used to obtain internal mutability, so no worry about the
- /// lock contention.
- buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
-}
-
-impl SharedBuffer {
- pub fn new(capacity: usize) -> Self {
- Self {
- buffer:
Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
- }
- }
-}
-
-impl Write for SharedBuffer {
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- let mut buffer = self.buffer.try_lock().unwrap();
- Write::write(&mut *buffer, buf)
- }
-
- fn flush(&mut self) -> std::io::Result<()> {
- let mut buffer = self.buffer.try_lock().unwrap();
- Write::flush(&mut *buffer)
- }
-}
-
#[cfg(test)]
pub(crate) mod test_util {
use super::*;
diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs
b/datafusion/core/src/datasource/file_format/write/mod.rs
index cfcdbd8c46..68fe81ce91 100644
--- a/datafusion/core/src/datasource/file_format/write/mod.rs
+++ b/datafusion/core/src/datasource/file_format/write/mod.rs
@@ -18,7 +18,7 @@
//! Module containing helper methods/traits related to enabling
//! write support for the various file formats
-use std::io::Error;
+use std::io::{Error, Write};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -43,6 +43,37 @@ use tokio::io::AsyncWrite;
pub(crate) mod demux;
pub(crate) mod orchestration;
+/// A buffer with interior mutability shared by the SerializedFileWriter and
+/// ObjectStore writer
+#[derive(Clone)]
+pub(crate) struct SharedBuffer {
+ /// The inner buffer for reading and writing
+ ///
+ /// The lock is used to obtain internal mutability, so no worry about the
+ /// lock contention.
+ pub(crate) buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
+}
+
+impl SharedBuffer {
+ pub fn new(capacity: usize) -> Self {
+ Self {
+ buffer:
Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
+ }
+ }
+}
+
+impl Write for SharedBuffer {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ let mut buffer = self.buffer.try_lock().unwrap();
+ Write::write(&mut *buffer, buf)
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ let mut buffer = self.buffer.try_lock().unwrap();
+ Write::flush(&mut *buffer)
+ }
+}
+
/// Stores data needed during abortion of MultiPart writers
#[derive(Clone)]
pub(crate) struct MultiPart {
diff --git a/datafusion/sqllogictest/test_files/copy.slt
b/datafusion/sqllogictest/test_files/copy.slt
index 02ab330833..89b2391788 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -230,6 +230,62 @@ select * from validate_csv_with_options;
1;Foo
2;Bar
+# Copy from table to single arrow file
+query IT
+COPY source_table to 'test_files/scratch/copy/table.arrow';
+----
+2
+
+# Validate single csv output
+statement ok
+CREATE EXTERNAL TABLE validate_arrow_file
+STORED AS arrow
+LOCATION 'test_files/scratch/copy/table.arrow';
+
+query IT
+select * from validate_arrow_file;
+----
+1 Foo
+2 Bar
+
+# Copy from dict encoded values to single arrow file
+query T?
+COPY (values
+('c', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('d', arrow_cast('bar',
'Dictionary(Int32, Utf8)')))
+to 'test_files/scratch/copy/table_dict.arrow';
+----
+2
+
+# Validate single csv output
+statement ok
+CREATE EXTERNAL TABLE validate_arrow_file_dict
+STORED AS arrow
+LOCATION 'test_files/scratch/copy/table_dict.arrow';
+
+query T?
+select * from validate_arrow_file_dict;
+----
+c foo
+d bar
+
+
+# Copy from table to folder of json
+query IT
+COPY source_table to 'test_files/scratch/copy/table_arrow' (format arrow,
single_file_output false);
+----
+2
+
+# Validate json output
+statement ok
+CREATE EXTERNAL TABLE validate_arrow STORED AS arrow LOCATION
'test_files/scratch/copy/table_arrow';
+
+query IT
+select * from validate_arrow;
+----
+1 Foo
+2 Bar
+
+
# Error cases:
# Copy from table with options
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index cdaf0bb643..e73778ad44 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -76,6 +76,45 @@ select * from dictionary_encoded_parquet_partitioned order
by (a);
a foo
b bar
+statement ok
+CREATE EXTERNAL TABLE dictionary_encoded_arrow_partitioned(
+ a varchar,
+ b varchar,
+)
+STORED AS arrow
+LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
+PARTITIONED BY (b)
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query TT
+insert into dictionary_encoded_arrow_partitioned
+select * from dictionary_encoded_values
+----
+2
+
+statement ok
+CREATE EXTERNAL TABLE dictionary_encoded_arrow_test_readback(
+ a varchar,
+)
+STORED AS arrow
+LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/b=bar/'
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query T
+select * from dictionary_encoded_arrow_test_readback;
+----
+b
+
+# https://github.com/apache/arrow-datafusion/issues/7816
+query error DataFusion error: Arrow error: Schema error: project index 1 out
of bounds, max field 1
+select * from dictionary_encoded_arrow_partitioned order by (a);
+
# test_insert_into
statement ok