alamb commented on code in PR #7141:
URL: https://github.com/apache/arrow-datafusion/pull/7141#discussion_r1279873767
##########
datafusion/core/src/physical_plan/insert.rs:
##########
@@ -136,6 +136,18 @@ impl InsertExec {
)))
}
}
+
+ fn make_all_input_streams(
Review Comment:
```suggestion
fn execute_all_input_streams(
```
##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -330,6 +330,8 @@ pub struct FileSinkConfig {
pub object_store_url: ObjectStoreUrl,
/// A vector of [`PartitionedFile`] structs, each representing a file
partition
pub file_groups: Vec<PartitionedFile>,
+ /// Vector of partition paths
+ pub table_paths: Vec<ListingTableUrl>,
Review Comment:
🤔 is this logically different than the `file_groups`
##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -254,7 +254,7 @@ impl MemSink {
impl DataSink for MemSink {
async fn write_all(
&self,
- mut data: SendableRecordBatchStream,
+ mut data: Vec<SendableRecordBatchStream>,
Review Comment:
❤️
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -804,21 +804,25 @@ impl TableProvider for ListingTable {
.await?;
let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
-
- if file_groups.len() > 1 {
- return Err(DataFusionError::Plan(
- "Datafusion currently supports tables from single partition
and/or file."
- .to_owned(),
- ));
+ let writer_mode;
+ //if we are writing a single output_partition to a table backed by a
single file
+ //we can append to that file. Otherwise, we can write new files into
the directory
+ //adding new files to the listing table in order to insert to the
table.
+ let input_partitions = input.output_partitioning().partition_count();
+ if file_groups.len() == 1 && input_partitions == 1 {
Review Comment:
Yes I agree - I think passing in a `write_options: ListingTableWriteOptions`
type structure would be the best API -- and that `write_options` could contain
additional information like desired save mode, partition by and ordering
##########
datafusion/core/src/datasource/physical_plan/mod.rs:
##########
@@ -330,6 +330,8 @@ pub struct FileSinkConfig {
pub object_store_url: ObjectStoreUrl,
/// A vector of [`PartitionedFile`] structs, each representing a file
partition
pub file_groups: Vec<PartitionedFile>,
+ /// Vector of partition paths
+ pub table_paths: Vec<ListingTableUrl>,
Review Comment:
🤔 is this logically different than the `file_groups`
##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -254,7 +254,7 @@ impl MemSink {
impl DataSink for MemSink {
async fn write_all(
&self,
- mut data: SendableRecordBatchStream,
+ mut data: Vec<SendableRecordBatchStream>,
Review Comment:
❤️
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]