This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d2cc4d253e `DataSink` additions (#7778)
d2cc4d253e is described below
commit d2cc4d253e13f32ac95545ae79a3cb2d4d59de78
Author: Daniƫl Heres <[email protected]>
AuthorDate: Tue Oct 10 14:55:18 2023 +0200
`DataSink` additions (#7778)
* File sink additions
* Fmt
* Clippy
* Update datafusion/physical-plan/src/insert.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Feedback
* Fmt
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/file_format/csv.rs | 9 +++++++++
datafusion/core/src/datasource/file_format/json.rs | 9 +++++++++
.../core/src/datasource/file_format/parquet.rs | 9 +++++++++
datafusion/core/src/datasource/memory.rs | 9 +++++++++
datafusion/physical-plan/src/insert.rs | 21 +++++++++++++++++++++
5 files changed, 57 insertions(+)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 4c625b7ed7..e77382ad9c 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -33,6 +33,7 @@ use datafusion_physical_expr::{PhysicalExpr,
PhysicalSortRequirement};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
+use datafusion_physical_plan::metrics::MetricsSet;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
@@ -484,6 +485,14 @@ impl CsvSink {
#[async_trait]
impl DataSink for CsvSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ None
+ }
+
async fn write_all(
&self,
data: Vec<SendableRecordBatchStream>,
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 6c260b9802..fa8fb5a723 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -25,6 +25,7 @@ use datafusion_common::DataFusionError;
use datafusion_common::FileType;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_plan::metrics::MetricsSet;
use rand::distributions::Alphanumeric;
use rand::distributions::DistString;
use std::fmt;
@@ -276,6 +277,14 @@ impl JsonSink {
#[async_trait]
impl DataSink for JsonSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ None
+ }
+
async fn write_all(
&self,
data: Vec<SendableRecordBatchStream>,
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 062ec1329d..d946bfb0b9 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -17,6 +17,7 @@
//! Parquet format abstractions
+use datafusion_physical_plan::metrics::MetricsSet;
use parquet::column::writer::ColumnCloseResult;
use parquet::file::writer::SerializedFileWriter;
use rand::distributions::DistString;
@@ -757,6 +758,14 @@ impl ParquetSink {
#[async_trait]
impl DataSink for ParquetSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ None
+ }
+
async fn write_all(
&self,
mut data: Vec<SendableRecordBatchStream>,
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index 2766e73d33..ba99a2b695 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -17,6 +17,7 @@
//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
+use datafusion_physical_plan::metrics::MetricsSet;
use futures::StreamExt;
use log::debug;
use std::any::Any;
@@ -259,6 +260,14 @@ impl MemSink {
#[async_trait]
impl DataSink for MemSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ None
+ }
+
async fn write_all(
&self,
mut data: Vec<SendableRecordBatchStream>,
diff --git a/datafusion/physical-plan/src/insert.rs
b/datafusion/physical-plan/src/insert.rs
index a7b0d32c8e..bff20e85b7 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -35,6 +35,7 @@ use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
+use crate::metrics::MetricsSet;
use crate::stream::RecordBatchStreamAdapter;
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_execution::TaskContext;
@@ -46,6 +47,16 @@ use datafusion_execution::TaskContext;
/// output.
#[async_trait]
pub trait DataSink: DisplayAs + Debug + Send + Sync {
+ /// Returns the data sink as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+
+ /// Return a snapshot of the [MetricsSet] for this
+ /// [DataSink].
+ ///
+ /// See [ExecutionPlan::metrics()] for more details
+ fn metrics(&self) -> Option<MetricsSet>;
+
// TODO add desired input ordering
// How does this sink want its input ordered?
@@ -151,6 +162,16 @@ impl FileSinkExec {
}
Ok(streams)
}
+
+ /// Returns insert sink
+ pub fn sink(&self) -> &dyn DataSink {
+ self.sink.as_ref()
+ }
+
+ /// Returns the metrics of the underlying [DataSink]
+ pub fn metrics(&self) -> Option<MetricsSet> {
+ self.sink.metrics()
+ }
}
impl DisplayAs for FileSinkExec {