This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 258a7cd1cb Support InsertInto Sorted ListingTable (#7743)
258a7cd1cb is described below
commit 258a7cd1cbf20d8f633db6e608649b98dde8b5ce
Author: Devin D'Angelo <[email protected]>
AuthorDate: Sun Oct 8 13:04:35 2023 -0400
Support InsertInto Sorted ListingTable (#7743)
* enable insert into sorted listing table
* add check for single file table
* improve test to verify both order conditions
* address feedback
---
datafusion/core/src/datasource/file_format/csv.rs | 10 ++++-
datafusion/core/src/datasource/file_format/json.rs | 9 +++-
datafusion/core/src/datasource/file_format/mod.rs | 3 +-
.../core/src/datasource/file_format/parquet.rs | 10 ++++-
datafusion/core/src/datasource/listing/table.rs | 48 +++++++++++++++-------
datafusion/core/src/datasource/memory.rs | 1 +
datafusion/core/src/physical_planner.rs | 2 +-
datafusion/physical-plan/src/insert.rs | 19 ++++++---
.../sqllogictest/test_files/insert_to_external.slt | 42 +++++++++++++++++++
9 files changed, 117 insertions(+), 27 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index c3295042b5..4c625b7ed7 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -29,7 +29,7 @@ use arrow::{self, datatypes::SchemaRef};
use arrow_array::RecordBatch;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
@@ -263,6 +263,7 @@ impl FileFormat for CsvFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
+ order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for CSV");
@@ -275,7 +276,12 @@ impl FileFormat for CsvFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(conf));
- Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
+ Ok(Arc::new(FileSinkExec::new(
+ input,
+ sink,
+ sink_schema,
+ order_requirements,
+ )) as _)
}
fn file_type(&self) -> FileType {
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 96fd4daa2d..6c260b9802 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -24,6 +24,7 @@ use datafusion_common::not_impl_err;
use datafusion_common::DataFusionError;
use datafusion_common::FileType;
use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortRequirement;
use rand::distributions::Alphanumeric;
use rand::distributions::DistString;
use std::fmt;
@@ -173,6 +174,7 @@ impl FileFormat for JsonFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
+ order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for
Json");
@@ -184,7 +186,12 @@ impl FileFormat for JsonFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
- Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
+ Ok(Arc::new(FileSinkExec::new(
+ input,
+ sink,
+ sink_schema,
+ order_requirements,
+ )) as _)
}
fn file_type(&self) -> FileType {
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 86f265ab94..293f062d86 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -41,7 +41,7 @@ use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};
use datafusion_common::{not_impl_err, DataFusionError, FileType};
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use async_trait::async_trait;
use object_store::{ObjectMeta, ObjectStore};
@@ -99,6 +99,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
_input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
_conf: FileSinkConfig,
+ _order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 8ddddab71f..062ec1329d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -36,7 +36,7 @@ use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError,
FileType};
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
@@ -229,6 +229,7 @@ impl FileFormat for ParquetFormat {
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
+ order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for
Parquet");
@@ -237,7 +238,12 @@ impl FileFormat for ParquetFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(ParquetSink::new(conf));
- Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
+ Ok(Arc::new(FileSinkExec::new(
+ input,
+ sink,
+ sink_schema,
+ order_requirements,
+ )) as _)
}
fn file_type(&self) -> FileType {
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 7834de2b19..e2696d5233 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -57,7 +57,9 @@ use
datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
-use datafusion_physical_expr::{create_physical_expr, LexOrdering,
PhysicalSortExpr};
+use datafusion_physical_expr::{
+ create_physical_expr, LexOrdering, PhysicalSortExpr,
PhysicalSortRequirement,
+};
use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
@@ -828,19 +830,6 @@ impl TableProvider for ListingTable {
);
}
- // TODO support inserts to sorted tables which preserve sort_order
- // Inserts currently make no effort to preserve sort_order. This could
lead to
- // incorrect query results on the table after inserting incorrectly
sorted data.
- let unsorted: Vec<Vec<Expr>> = vec![];
- if self.options.file_sort_order != unsorted {
- return Err(
- DataFusionError::NotImplemented(
- "Writing to a sorted listing table via insert into is not
supported yet. \
- To write to this table in the meantime, register an
equivalent table with \
- file_sort_order = vec![]".into())
- );
- }
-
let table_path = &self.table_paths()[0];
// Get the object store for the table path.
let store = state.runtime_env().object_store(table_path)?;
@@ -911,9 +900,38 @@ impl TableProvider for ListingTable {
file_type_writer_options,
};
+ let unsorted: Vec<Vec<Expr>> = vec![];
+ let order_requirements = if self.options().file_sort_order != unsorted
{
+ if matches!(
+ self.options().insert_mode,
+ ListingTableInsertMode::AppendToFile
+ ) {
+ return Err(DataFusionError::Plan(
+ "Cannot insert into a sorted ListingTable with mode
append!".into(),
+ ));
+ }
+ // Multiple sort orders in outer vec are equivalent, so we pass
only the first one
+ let ordering = self
+ .try_create_output_ordering()?
+ .get(0)
+ .ok_or(DataFusionError::Internal(
+ "Expected ListingTable to have a sort order, but none
found!".into(),
+ ))?
+ .clone();
+ // Converts Vec<Vec<SortExpr>> into type required by execution
plan to specify its required input ordering
+ Some(
+ ordering
+ .into_iter()
+ .map(PhysicalSortRequirement::from)
+ .collect::<Vec<_>>(),
+ )
+ } else {
+ None
+ };
+
self.options()
.format
- .create_writer_physical_plan(input, state, config)
+ .create_writer_physical_plan(input, state, config,
order_requirements)
.await
}
}
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index 337a8cabc2..6231bd2c2f 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -223,6 +223,7 @@ impl TableProvider for MemTable {
input,
sink,
self.schema.clone(),
+ None,
)))
}
}
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 84b5b9afa7..35119f374f 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -604,7 +604,7 @@ impl DefaultPhysicalPlanner {
FileType::ARROW => Arc::new(ArrowFormat {}),
};
- sink_format.create_writer_physical_plan(input_exec,
session_state, config).await
+ sink_format.create_writer_physical_plan(input_exec,
session_state, config, None).await
}
LogicalPlan::Dml(DmlStatement {
table_name,
diff --git a/datafusion/physical-plan/src/insert.rs
b/datafusion/physical-plan/src/insert.rs
index 8b467461dd..a7b0d32c8e 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -73,6 +73,8 @@ pub struct FileSinkExec {
sink_schema: SchemaRef,
/// Schema describing the structure of the output data.
count_schema: SchemaRef,
+ /// Optional required sort order for output data.
+ sort_order: Option<Vec<PhysicalSortRequirement>>,
}
impl fmt::Debug for FileSinkExec {
@@ -87,12 +89,14 @@ impl FileSinkExec {
input: Arc<dyn ExecutionPlan>,
sink: Arc<dyn DataSink>,
sink_schema: SchemaRef,
+ sort_order: Option<Vec<PhysicalSortRequirement>>,
) -> Self {
Self {
input,
sink,
sink_schema,
count_schema: make_count_schema(),
+ sort_order,
}
}
@@ -192,16 +196,20 @@ impl ExecutionPlan for FileSinkExec {
}
fn required_input_ordering(&self) ->
Vec<Option<Vec<PhysicalSortRequirement>>> {
- // Require that the InsertExec gets the data in the order the
+ // The input order is either exlicitly set (such as by a ListingTable),
+ // or require that the [FileSinkExec] gets the data in the order the
// input produced it (otherwise the optimizer may chose to reorder
// the input which could result in unintended / poor UX)
//
// More rationale:
//
https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
- vec![self
- .input
- .output_ordering()
- .map(PhysicalSortRequirement::from_sort_exprs)]
+ match &self.sort_order {
+ Some(requirements) => vec![Some(requirements.clone())],
+ None => vec![self
+ .input
+ .output_ordering()
+ .map(PhysicalSortRequirement::from_sort_exprs)],
+ }
}
fn maintains_input_order(&self) -> Vec<bool> {
@@ -221,6 +229,7 @@ impl ExecutionPlan for FileSinkExec {
sink: self.sink.clone(),
sink_schema: self.sink_schema.clone(),
count_schema: self.count_schema.clone(),
+ sort_order: self.sort_order.clone(),
}))
}
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index a29c230a46..d1b73204e3 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -45,6 +45,48 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
statement ok
set datafusion.execution.target_partitions = 8;
+statement ok
+CREATE EXTERNAL TABLE
+ordered_insert_test(a bigint, b bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/insert_to_ordered/'
+WITH ORDER (a ASC, B DESC)
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query TT
+EXPLAIN INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8),
(7,9), (7,10), (3, 3), (2, 4), (1, 5);
+----
+logical_plan
+Dml: op=[Insert Into] table=[ordered_insert_test]
+--Projection: column1 AS a, column2 AS b
+----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)),
(Int64(7), Int64(8)), (Int64(7), Int64(9))...
+physical_plan
+InsertExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[])
+--SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC]
+----ProjectionExec: expr=[column1@0 as a, column2@1 as b]
+------ValuesExec
+
+query II
+INSERT INTO ordered_insert_test values (5, 1), (4, 2), (7,7), (7,8), (7,9),
(7,10), (3, 3), (2, 4), (1, 5);
+----
+9
+
+query II
+SELECT * from ordered_insert_test;
+----
+1 5
+2 4
+3 3
+4 2
+5 1
+7 10
+7 9
+7 8
+7 7
+
statement ok
CREATE EXTERNAL TABLE
single_file_test(a bigint, b bigint)