devinjdangelo commented on code in PR #8608:
URL: https://github.com/apache/arrow-datafusion/pull/8608#discussion_r1435356536


##########
datafusion/core/src/datasource/file_format/arrow.rs:
##########
@@ -97,11 +110,191 @@ impl FileFormat for ArrowFormat {
         Ok(Arc::new(exec))
     }
 
+    async fn create_writer_physical_plan(
+        &self,
+        input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        conf: FileSinkConfig,
+        order_requirements: Option<Vec<PhysicalSortRequirement>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if conf.overwrite {
+            return not_impl_err!("Overwrites are not implemented yet for Arrow 
format");
+        }
+
+        let sink_schema = conf.output_schema().clone();
+        let sink = Arc::new(ArrowFileSink::new(conf));
+
+        Ok(Arc::new(FileSinkExec::new(
+            input,
+            sink,
+            sink_schema,
+            order_requirements,
+        )) as _)
+    }
+
     fn file_type(&self) -> FileType {
         FileType::ARROW
     }
 }
 
+/// Implements [`DataSink`] for writing to arrow_ipc files
+struct ArrowFileSink {
+    config: FileSinkConfig,
+}
+
+impl ArrowFileSink {
+    fn new(config: FileSinkConfig) -> Self {
+        Self { config }
+    }
+
+    /// Converts table schema to writer schema, which may differ in the case
+    /// of hive style partitioning where some columns are removed from the
+    /// underlying files.
+    fn get_writer_schema(&self) -> Arc<Schema> {
+        if !self.config.table_partition_cols.is_empty() {
+            let schema = self.config.output_schema();
+            let partition_names: Vec<_> = self
+                .config
+                .table_partition_cols
+                .iter()
+                .map(|(s, _)| s)
+                .collect();
+            Arc::new(Schema::new(
+                schema
+                    .fields()
+                    .iter()
+                    .filter(|f| !partition_names.contains(&f.name()))
+                    .map(|f| (**f).clone())
+                    .collect::<Vec<_>>(),
+            ))
+        } else {
+            self.config.output_schema().clone()
+        }
+    }
+}
+
+impl Debug for ArrowFileSink {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ArrowFileSink").finish()
+    }
+}
+
+impl DisplayAs for ArrowFileSink {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "ArrowFileSink(file_groups=",)?;
+                FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+                write!(f, ")")
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl DataSink for ArrowFileSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
+    }
+
+    async fn write_all(
+        &self,
+        data: SendableRecordBatchStream,
+        context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        // No props are supported yet, but can be by updating 
FileTypeWriterOptions
+        // to populate this struct and use those options to initialize the 
arrow_ipc::writer::FileWriter
+        let _arrow_props = 
self.config.file_type_writer_options.try_into_arrow()?;
+
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.config.object_store_url)?;
+
+        let part_col = if !self.config.table_partition_cols.is_empty() {
+            Some(self.config.table_partition_cols.clone())
+        } else {
+            None
+        };
+
+        let (demux_task, mut file_stream_rx) = start_demuxer_task(
+            data,
+            context,
+            part_col,
+            self.config.table_paths[0].clone(),
+            "arrow".into(),
+            self.config.single_file_output,
+        );
+
+        let mut file_write_tasks: JoinSet<std::result::Result<usize, 
DataFusionError>> =
+            JoinSet::new();
+        while let Some((path, mut rx)) = file_stream_rx.recv().await {
+            let shared_buffer = SharedBuffer::new(1048576);

Review Comment:
   The initial buffer size is just a size hint for efficiency. It will grow 
beyond the set value if needed. 
   
   We can definitely make it a named constant, or even make it configurable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to