alamb commented on code in PR #8608:
URL: https://github.com/apache/arrow-datafusion/pull/8608#discussion_r1435349817


##########
datafusion/core/src/datasource/file_format/arrow.rs:
##########
@@ -97,11 +110,191 @@ impl FileFormat for ArrowFormat {
         Ok(Arc::new(exec))
     }
 
+    async fn create_writer_physical_plan(
+        &self,
+        input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        conf: FileSinkConfig,
+        order_requirements: Option<Vec<PhysicalSortRequirement>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if conf.overwrite {
+            return not_impl_err!("Overwrites are not implemented yet for Arrow 
format");
+        }
+
+        let sink_schema = conf.output_schema().clone();
+        let sink = Arc::new(ArrowFileSink::new(conf));
+
+        Ok(Arc::new(FileSinkExec::new(
+            input,
+            sink,
+            sink_schema,
+            order_requirements,
+        )) as _)
+    }
+
     fn file_type(&self) -> FileType {
         FileType::ARROW
     }
 }
 
+/// Implements [`DataSink`] for writing to arrow_ipc files
+struct ArrowFileSink {
+    config: FileSinkConfig,
+}
+
+impl ArrowFileSink {
+    fn new(config: FileSinkConfig) -> Self {
+        Self { config }
+    }
+
+    /// Converts table schema to writer schema, which may differ in the case
+    /// of hive style partitioning where some columns are removed from the
+    /// underlying files.
+    fn get_writer_schema(&self) -> Arc<Schema> {
+        if !self.config.table_partition_cols.is_empty() {
+            let schema = self.config.output_schema();
+            let partition_names: Vec<_> = self
+                .config
+                .table_partition_cols
+                .iter()
+                .map(|(s, _)| s)
+                .collect();
+            Arc::new(Schema::new(
+                schema
+                    .fields()
+                    .iter()
+                    .filter(|f| !partition_names.contains(&f.name()))
+                    .map(|f| (**f).clone())
+                    .collect::<Vec<_>>(),
+            ))
+        } else {
+            self.config.output_schema().clone()
+        }
+    }
+}
+
+impl Debug for ArrowFileSink {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ArrowFileSink").finish()
+    }
+}
+
+impl DisplayAs for ArrowFileSink {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "ArrowFileSink(file_groups=",)?;
+                FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+                write!(f, ")")
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl DataSink for ArrowFileSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
+    }
+
+    async fn write_all(
+        &self,
+        data: SendableRecordBatchStream,
+        context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        // No props are supported yet, but can be by updating 
FileTypeWriterOptions
+        // to populate this struct and use those options to initialize the 
arrow_ipc::writer::FileWriter
+        let _arrow_props = 
self.config.file_type_writer_options.try_into_arrow()?;

Review Comment:
   I think we should track this as a follow on ticket and ideally leave a 
comment in the code pointing to the ticket so it eventually gets cleaned up



##########
datafusion/core/src/datasource/file_format/arrow.rs:
##########
@@ -97,11 +110,191 @@ impl FileFormat for ArrowFormat {
         Ok(Arc::new(exec))
     }
 
+    async fn create_writer_physical_plan(
+        &self,
+        input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        conf: FileSinkConfig,
+        order_requirements: Option<Vec<PhysicalSortRequirement>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if conf.overwrite {
+            return not_impl_err!("Overwrites are not implemented yet for Arrow 
format");
+        }
+
+        let sink_schema = conf.output_schema().clone();
+        let sink = Arc::new(ArrowFileSink::new(conf));
+
+        Ok(Arc::new(FileSinkExec::new(
+            input,
+            sink,
+            sink_schema,
+            order_requirements,
+        )) as _)
+    }
+
     fn file_type(&self) -> FileType {
         FileType::ARROW
     }
 }
 
+/// Implements [`DataSink`] for writing to arrow_ipc files
+struct ArrowFileSink {
+    config: FileSinkConfig,
+}
+
+impl ArrowFileSink {
+    fn new(config: FileSinkConfig) -> Self {
+        Self { config }
+    }
+
+    /// Converts table schema to writer schema, which may differ in the case
+    /// of hive style partitioning where some columns are removed from the
+    /// underlying files.
+    fn get_writer_schema(&self) -> Arc<Schema> {
+        if !self.config.table_partition_cols.is_empty() {
+            let schema = self.config.output_schema();
+            let partition_names: Vec<_> = self
+                .config
+                .table_partition_cols
+                .iter()
+                .map(|(s, _)| s)
+                .collect();
+            Arc::new(Schema::new(
+                schema
+                    .fields()
+                    .iter()
+                    .filter(|f| !partition_names.contains(&f.name()))
+                    .map(|f| (**f).clone())
+                    .collect::<Vec<_>>(),
+            ))
+        } else {
+            self.config.output_schema().clone()
+        }
+    }
+}
+
+impl Debug for ArrowFileSink {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ArrowFileSink").finish()
+    }
+}
+
+impl DisplayAs for ArrowFileSink {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "ArrowFileSink(file_groups=",)?;
+                FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+                write!(f, ")")
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl DataSink for ArrowFileSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
+    }
+
+    async fn write_all(
+        &self,
+        data: SendableRecordBatchStream,
+        context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        // No props are supported yet, but can be by updating 
FileTypeWriterOptions
+        // to populate this struct and use those options to initialize the 
arrow_ipc::writer::FileWriter
+        let _arrow_props = 
self.config.file_type_writer_options.try_into_arrow()?;
+
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.config.object_store_url)?;
+
+        let part_col = if !self.config.table_partition_cols.is_empty() {
+            Some(self.config.table_partition_cols.clone())
+        } else {
+            None
+        };
+
+        let (demux_task, mut file_stream_rx) = start_demuxer_task(
+            data,
+            context,
+            part_col,
+            self.config.table_paths[0].clone(),
+            "arrow".into(),
+            self.config.single_file_output,
+        );
+
+        let mut file_write_tasks: JoinSet<std::result::Result<usize, 
DataFusionError>> =
+            JoinSet::new();
+        while let Some((path, mut rx)) = file_stream_rx.recv().await {
+            let shared_buffer = SharedBuffer::new(1048576);

Review Comment:
   Does this mean that if any record batch takes more than 1MB to write out 
we'll get an error?
   
   Would it be possible to make this constant and `1024000` below into names 
constants with comments that explain what they do ?



##########
datafusion/sqllogictest/test_files/insert_to_external.slt:
##########
@@ -76,6 +76,44 @@ select * from dictionary_encoded_parquet_partitioned order 
by (a);
 a foo
 b bar
 
+statement ok
+CREATE EXTERNAL TABLE dictionary_encoded_arrow_partitioned(
+  a varchar,
+  b varchar,
+) 
+STORED AS arrow
+LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
+PARTITIONED BY (b)
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query TT
+insert into dictionary_encoded_arrow_partitioned 
+select * from dictionary_encoded_values
+----
+2
+
+statement ok
+CREATE EXTERNAL TABLE dictionary_encoded_arrow_test_readback(
+  a varchar,
+) 
+STORED AS arrow
+LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/b=bar/'
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query T
+select * from dictionary_encoded_arrow_test_readback;
+----
+b
+
+query error DataFusion error: Arrow error: Schema error: project index 1 out 
of bounds, max field 1

Review Comment:
   ```suggestion
   # https://github.com/apache/arrow-datafusion/issues/7816
   query error DataFusion error: Arrow error: Schema error: project index 1 out 
of bounds, max field 1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to