This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d2cc4d253e `DataSink` additions (#7778)
d2cc4d253e is described below

commit d2cc4d253e13f32ac95545ae79a3cb2d4d59de78
Author: DaniĆ«l Heres <[email protected]>
AuthorDate: Tue Oct 10 14:55:18 2023 +0200

    `DataSink` additions (#7778)
    
    * File sink additions
    
    * Fmt
    
    * Clippy
    
    * Update datafusion/physical-plan/src/insert.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Feedback
    
    * Fmt
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/datasource/file_format/csv.rs   |  9 +++++++++
 datafusion/core/src/datasource/file_format/json.rs  |  9 +++++++++
 .../core/src/datasource/file_format/parquet.rs      |  9 +++++++++
 datafusion/core/src/datasource/memory.rs            |  9 +++++++++
 datafusion/physical-plan/src/insert.rs              | 21 +++++++++++++++++++++
 5 files changed, 57 insertions(+)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 4c625b7ed7..e77382ad9c 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -33,6 +33,7 @@ use datafusion_physical_expr::{PhysicalExpr, 
PhysicalSortRequirement};
 
 use async_trait::async_trait;
 use bytes::{Buf, Bytes};
+use datafusion_physical_plan::metrics::MetricsSet;
 use futures::stream::BoxStream;
 use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
 use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
@@ -484,6 +485,14 @@ impl CsvSink {
 
 #[async_trait]
 impl DataSink for CsvSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
+    }
+
     async fn write_all(
         &self,
         data: Vec<SendableRecordBatchStream>,
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 6c260b9802..fa8fb5a723 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -25,6 +25,7 @@ use datafusion_common::DataFusionError;
 use datafusion_common::FileType;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_plan::metrics::MetricsSet;
 use rand::distributions::Alphanumeric;
 use rand::distributions::DistString;
 use std::fmt;
@@ -276,6 +277,14 @@ impl JsonSink {
 
 #[async_trait]
 impl DataSink for JsonSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
+    }
+
     async fn write_all(
         &self,
         data: Vec<SendableRecordBatchStream>,
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 062ec1329d..d946bfb0b9 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -17,6 +17,7 @@
 
 //! Parquet format abstractions
 
+use datafusion_physical_plan::metrics::MetricsSet;
 use parquet::column::writer::ColumnCloseResult;
 use parquet::file::writer::SerializedFileWriter;
 use rand::distributions::DistString;
@@ -757,6 +758,14 @@ impl ParquetSink {
 
 #[async_trait]
 impl DataSink for ParquetSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
+    }
+
     async fn write_all(
         &self,
         mut data: Vec<SendableRecordBatchStream>,
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index 2766e73d33..ba99a2b695 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -17,6 +17,7 @@
 
 //! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
 
+use datafusion_physical_plan::metrics::MetricsSet;
 use futures::StreamExt;
 use log::debug;
 use std::any::Any;
@@ -259,6 +260,14 @@ impl MemSink {
 
 #[async_trait]
 impl DataSink for MemSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
+    }
+
     async fn write_all(
         &self,
         mut data: Vec<SendableRecordBatchStream>,
diff --git a/datafusion/physical-plan/src/insert.rs 
b/datafusion/physical-plan/src/insert.rs
index a7b0d32c8e..bff20e85b7 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -35,6 +35,7 @@ use std::any::Any;
 use std::fmt::Debug;
 use std::sync::Arc;
 
+use crate::metrics::MetricsSet;
 use crate::stream::RecordBatchStreamAdapter;
 use datafusion_common::{exec_err, internal_err, DataFusionError};
 use datafusion_execution::TaskContext;
@@ -46,6 +47,16 @@ use datafusion_execution::TaskContext;
 /// output.
 #[async_trait]
 pub trait DataSink: DisplayAs + Debug + Send + Sync {
+    /// Returns the data sink as [`Any`](std::any::Any) so that it can be
+    /// downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// Return a snapshot of the [MetricsSet] for this
+    /// [DataSink].
+    ///
+    /// See [ExecutionPlan::metrics()] for more details
+    fn metrics(&self) -> Option<MetricsSet>;
+
     // TODO add desired input ordering
     // How does this sink want its input ordered?
 
@@ -151,6 +162,16 @@ impl FileSinkExec {
         }
         Ok(streams)
     }
+
+    /// Returns insert sink
+    pub fn sink(&self) -> &dyn DataSink {
+        self.sink.as_ref()
+    }
+
+    /// Returns the metrics of the underlying [DataSink]
+    pub fn metrics(&self) -> Option<MetricsSet> {
+        self.sink.metrics()
+    }
 }
 
 impl DisplayAs for FileSinkExec {

Reply via email to