This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 97bd84acbb refactor: Move `Memtable` to catalog (#15459)
97bd84acbb is described below
commit 97bd84acbb96d17adee59f8e47b89db84fdf1d88
Author: logan-keede <[email protected]>
AuthorDate: Mon Mar 31 23:39:07 2025 +0530
refactor: Move `Memtable` to catalog (#15459)
* first iteration
* fix: CI
* move statistics
* Merge relics
* MemSink to datasource
* clean stuff + backward compatibility
* fix: cargo doc
* fix:fmt
---
datafusion/catalog/src/lib.rs | 2 +-
datafusion/catalog/src/memory/mod.rs | 6 +
datafusion/catalog/src/memory/table.rs | 296 ++++++++++++++++
.../src/datasource/{memory.rs => memory_test.rs} | 371 +--------------------
datafusion/core/src/datasource/mod.rs | 6 +-
datafusion/core/src/datasource/statistics.rs | 219 ------------
datafusion/datasource/src/memory.rs | 92 ++++-
datafusion/datasource/src/mod.rs | 1 +
datafusion/datasource/src/statistics.rs | 195 +++++++++++
9 files changed, 602 insertions(+), 586 deletions(-)
diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs
index f160bddd2b..0394b05277 100644
--- a/datafusion/catalog/src/lib.rs
+++ b/datafusion/catalog/src/lib.rs
@@ -50,7 +50,7 @@ pub use catalog::*;
pub use datafusion_session::Session;
pub use dynamic_file::catalog::*;
pub use memory::{
- MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
+ MemTable, MemoryCatalogProvider, MemoryCatalogProviderList,
MemorySchemaProvider,
};
pub use r#async::*;
pub use schema::*;
diff --git a/datafusion/catalog/src/memory/mod.rs
b/datafusion/catalog/src/memory/mod.rs
index 4c5cf1a9ae..541d25b334 100644
--- a/datafusion/catalog/src/memory/mod.rs
+++ b/datafusion/catalog/src/memory/mod.rs
@@ -17,6 +17,12 @@
pub(crate) mod catalog;
pub(crate) mod schema;
+pub(crate) mod table;
pub use catalog::*;
pub use schema::*;
+pub use table::*;
+
+// backward compatibility
+pub use datafusion_datasource::memory::MemorySourceConfig;
+pub use datafusion_datasource::source::DataSourceExec;
diff --git a/datafusion/catalog/src/memory/table.rs
b/datafusion/catalog/src/memory/table.rs
new file mode 100644
index 0000000000..81243e2c48
--- /dev/null
+++ b/datafusion/catalog/src/memory/table.rs
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use crate::TableProvider;
+use datafusion_common::error::Result;
+use datafusion_expr::Expr;
+use datafusion_expr::TableType;
+use datafusion_physical_expr::create_physical_sort_exprs;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::{
+ common, ExecutionPlan, ExecutionPlanProperties, Partitioning,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema,
SchemaExt};
+use datafusion_common_runtime::JoinSet;
+use datafusion_datasource::memory::MemSink;
+use datafusion_datasource::memory::MemorySourceConfig;
+use datafusion_datasource::sink::DataSinkExec;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_expr::dml::InsertOp;
+use datafusion_expr::SortExpr;
+use datafusion_session::Session;
+
+use async_trait::async_trait;
+use futures::StreamExt;
+use log::debug;
+use parking_lot::Mutex;
+use tokio::sync::RwLock;
+
+// backward compatibility
+pub use datafusion_datasource::memory::PartitionData;
+
+/// In-memory data source for presenting a `Vec<RecordBatch>` as a
+/// data source that can be queried by DataFusion. This allows data to
+/// be pre-loaded into memory and then repeatedly queried without
+/// incurring additional file I/O overhead.
+#[derive(Debug)]
+pub struct MemTable {
+ schema: SchemaRef,
+ // batches used to be pub(crate), but it's needed to be public for the
tests
+ pub batches: Vec<PartitionData>,
+ constraints: Constraints,
+ column_defaults: HashMap<String, Expr>,
+ /// Optional pre-known sort order(s). Must be `SortExpr`s.
+ /// inserting data into this table removes the order
+ pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
+}
+
+impl MemTable {
+ /// Create a new in-memory table from the provided schema and record
batches
+ pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) ->
Result<Self> {
+ for batches in partitions.iter().flatten() {
+ let batches_schema = batches.schema();
+ if !schema.contains(&batches_schema) {
+ debug!(
+ "mem table schema does not contain batches schema. \
+ Target_schema: {schema:?}. Batches Schema:
{batches_schema:?}"
+ );
+ return plan_err!("Mismatch between schema and batches");
+ }
+ }
+
+ Ok(Self {
+ schema,
+ batches: partitions
+ .into_iter()
+ .map(|e| Arc::new(RwLock::new(e)))
+ .collect::<Vec<_>>(),
+ constraints: Constraints::empty(),
+ column_defaults: HashMap::new(),
+ sort_order: Arc::new(Mutex::new(vec![])),
+ })
+ }
+
+ /// Assign constraints
+ pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+ self.constraints = constraints;
+ self
+ }
+
+ /// Assign column defaults
+ pub fn with_column_defaults(
+ mut self,
+ column_defaults: HashMap<String, Expr>,
+ ) -> Self {
+ self.column_defaults = column_defaults;
+ self
+ }
+
+ /// Specify an optional pre-known sort order(s). Must be `SortExpr`s.
+ ///
+ /// If the data is not sorted by this order, DataFusion may produce
+ /// incorrect results.
+ ///
+ /// DataFusion may take advantage of this ordering to omit sorts
+ /// or use more efficient algorithms.
+ ///
+ /// Note that multiple sort orders are supported, if some are known to be
+ /// equivalent,
+ pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
+ std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
+ self
+ }
+
+ /// Create a mem table by reading from another data source
+ pub async fn load(
+ t: Arc<dyn TableProvider>,
+ output_partitions: Option<usize>,
+ state: &dyn Session,
+ ) -> Result<Self> {
+ let schema = t.schema();
+ let constraints = t.constraints();
+ let exec = t.scan(state, None, &[], None).await?;
+ let partition_count = exec.output_partitioning().partition_count();
+
+ let mut join_set = JoinSet::new();
+
+ for part_idx in 0..partition_count {
+ let task = state.task_ctx();
+ let exec = Arc::clone(&exec);
+ join_set.spawn(async move {
+ let stream = exec.execute(part_idx, task)?;
+ common::collect(stream).await
+ });
+ }
+
+ let mut data: Vec<Vec<RecordBatch>> =
+ Vec::with_capacity(exec.output_partitioning().partition_count());
+
+ while let Some(result) = join_set.join_next().await {
+ match result {
+ Ok(res) => data.push(res?),
+ Err(e) => {
+ if e.is_panic() {
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ unreachable!();
+ }
+ }
+ }
+ }
+
+ let mut exec =
DataSourceExec::new(Arc::new(MemorySourceConfig::try_new(
+ &data,
+ Arc::clone(&schema),
+ None,
+ )?));
+ if let Some(cons) = constraints {
+ exec = exec.with_constraints(cons.clone());
+ }
+
+ if let Some(num_partitions) = output_partitions {
+ let exec = RepartitionExec::try_new(
+ Arc::new(exec),
+ Partitioning::RoundRobinBatch(num_partitions),
+ )?;
+
+ // execute and collect results
+ let mut output_partitions = vec![];
+ for i in
0..exec.properties().output_partitioning().partition_count() {
+ // execute this *output* partition and collect all batches
+ let task_ctx = state.task_ctx();
+ let mut stream = exec.execute(i, task_ctx)?;
+ let mut batches = vec![];
+ while let Some(result) = stream.next().await {
+ batches.push(result?);
+ }
+ output_partitions.push(batches);
+ }
+
+ return MemTable::try_new(Arc::clone(&schema), output_partitions);
+ }
+ MemTable::try_new(Arc::clone(&schema), data)
+ }
+}
+
+#[async_trait]
+impl TableProvider for MemTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+
+ fn constraints(&self) -> Option<&Constraints> {
+ Some(&self.constraints)
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let mut partitions = vec![];
+ for arc_inner_vec in self.batches.iter() {
+ let inner_vec = arc_inner_vec.read().await;
+ partitions.push(inner_vec.clone())
+ }
+
+ let mut source =
+ MemorySourceConfig::try_new(&partitions, self.schema(),
projection.cloned())?;
+
+ let show_sizes = state.config_options().explain.show_sizes;
+ source = source.with_show_sizes(show_sizes);
+
+ // add sort information if present
+ let sort_order = self.sort_order.lock();
+ if !sort_order.is_empty() {
+ let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
+
+ let file_sort_order = sort_order
+ .iter()
+ .map(|sort_exprs| {
+ create_physical_sort_exprs(
+ sort_exprs,
+ &df_schema,
+ state.execution_props(),
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+ source = source.try_with_sort_information(file_sort_order)?;
+ }
+
+ Ok(DataSourceExec::from_data_source(source))
+ }
+
+ /// Returns an ExecutionPlan that inserts the execution results of a given
[`ExecutionPlan`] into this [`MemTable`].
+ ///
+ /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
+ ///
+ /// # Arguments
+ ///
+ /// * `state` - The [`SessionState`] containing the context for executing
the plan.
+ /// * `input` - The [`ExecutionPlan`] to execute and insert.
+ ///
+ /// # Returns
+ ///
+ /// * A plan that returns the number of rows written.
+ ///
+ /// [`SessionState`]:
https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html
+ async fn insert_into(
+ &self,
+ _state: &dyn Session,
+ input: Arc<dyn ExecutionPlan>,
+ insert_op: InsertOp,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // If we are inserting into the table, any sort order may be messed up
so reset it here
+ *self.sort_order.lock() = vec![];
+
+ // Create a physical plan from the logical plan.
+ // Check that the schema of the plan matches the schema of this table.
+ self.schema()
+ .logically_equivalent_names_and_types(&input.schema())?;
+
+ if insert_op != InsertOp::Append {
+ return not_impl_err!("{insert_op} not implemented for MemoryTable
yet");
+ }
+ let sink = MemSink::try_new(self.batches.clone(),
Arc::clone(&self.schema))?;
+ Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
+ }
+
+ fn get_column_default(&self, column: &str) -> Option<&Expr> {
+ self.column_defaults.get(column)
+ }
+}
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory_test.rs
similarity index 58%
rename from datafusion/core/src/datasource/memory.rs
rename to datafusion/core/src/datasource/memory_test.rs
index 0288cd3e8b..381000ab8e 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory_test.rs
@@ -15,378 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
-
-use std::any::Any;
-use std::collections::HashMap;
-use std::fmt::{self, Debug};
-use std::sync::Arc;
-
-use crate::datasource::{TableProvider, TableType};
-use crate::error::Result;
-use crate::logical_expr::Expr;
-use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::{
- common, DisplayAs, DisplayFormatType, ExecutionPlan,
ExecutionPlanProperties,
- Partitioning, SendableRecordBatchStream,
-};
-use crate::physical_planner::create_physical_sort_exprs;
-
-use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatch;
-use datafusion_catalog::Session;
-use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema,
SchemaExt};
-use datafusion_common_runtime::JoinSet;
-pub use datafusion_datasource::memory::MemorySourceConfig;
-use datafusion_datasource::sink::{DataSink, DataSinkExec};
-pub use datafusion_datasource::source::DataSourceExec;
-use datafusion_execution::TaskContext;
-use datafusion_expr::dml::InsertOp;
-use datafusion_expr::SortExpr;
-
-use async_trait::async_trait;
-use futures::StreamExt;
-use log::debug;
-use parking_lot::Mutex;
-use tokio::sync::RwLock;
-
-/// Type alias for partition data
-pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
-
-/// In-memory data source for presenting a `Vec<RecordBatch>` as a
-/// data source that can be queried by DataFusion. This allows data to
-/// be pre-loaded into memory and then repeatedly queried without
-/// incurring additional file I/O overhead.
-#[derive(Debug)]
-pub struct MemTable {
- schema: SchemaRef,
- pub(crate) batches: Vec<PartitionData>,
- constraints: Constraints,
- column_defaults: HashMap<String, Expr>,
- /// Optional pre-known sort order(s). Must be `SortExpr`s.
- /// inserting data into this table removes the order
- pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
-}
-
-impl MemTable {
- /// Create a new in-memory table from the provided schema and record
batches
- pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) ->
Result<Self> {
- for batches in partitions.iter().flatten() {
- let batches_schema = batches.schema();
- if !schema.contains(&batches_schema) {
- debug!(
- "mem table schema does not contain batches schema. \
- Target_schema: {schema:?}. Batches Schema:
{batches_schema:?}"
- );
- return plan_err!("Mismatch between schema and batches");
- }
- }
-
- Ok(Self {
- schema,
- batches: partitions
- .into_iter()
- .map(|e| Arc::new(RwLock::new(e)))
- .collect::<Vec<_>>(),
- constraints: Constraints::empty(),
- column_defaults: HashMap::new(),
- sort_order: Arc::new(Mutex::new(vec![])),
- })
- }
-
- /// Assign constraints
- pub fn with_constraints(mut self, constraints: Constraints) -> Self {
- self.constraints = constraints;
- self
- }
-
- /// Assign column defaults
- pub fn with_column_defaults(
- mut self,
- column_defaults: HashMap<String, Expr>,
- ) -> Self {
- self.column_defaults = column_defaults;
- self
- }
-
- /// Specify an optional pre-known sort order(s). Must be `SortExpr`s.
- ///
- /// If the data is not sorted by this order, DataFusion may produce
- /// incorrect results.
- ///
- /// DataFusion may take advantage of this ordering to omit sorts
- /// or use more efficient algorithms.
- ///
- /// Note that multiple sort orders are supported, if some are known to be
- /// equivalent,
- pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
- std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
- self
- }
-
- /// Create a mem table by reading from another data source
- pub async fn load(
- t: Arc<dyn TableProvider>,
- output_partitions: Option<usize>,
- state: &dyn Session,
- ) -> Result<Self> {
- let schema = t.schema();
- let constraints = t.constraints();
- let exec = t.scan(state, None, &[], None).await?;
- let partition_count = exec.output_partitioning().partition_count();
-
- let mut join_set = JoinSet::new();
-
- for part_idx in 0..partition_count {
- let task = state.task_ctx();
- let exec = Arc::clone(&exec);
- join_set.spawn(async move {
- let stream = exec.execute(part_idx, task)?;
- common::collect(stream).await
- });
- }
-
- let mut data: Vec<Vec<RecordBatch>> =
- Vec::with_capacity(exec.output_partitioning().partition_count());
-
- while let Some(result) = join_set.join_next().await {
- match result {
- Ok(res) => data.push(res?),
- Err(e) => {
- if e.is_panic() {
- std::panic::resume_unwind(e.into_panic());
- } else {
- unreachable!();
- }
- }
- }
- }
-
- let mut exec =
DataSourceExec::new(Arc::new(MemorySourceConfig::try_new(
- &data,
- Arc::clone(&schema),
- None,
- )?));
- if let Some(cons) = constraints {
- exec = exec.with_constraints(cons.clone());
- }
-
- if let Some(num_partitions) = output_partitions {
- let exec = RepartitionExec::try_new(
- Arc::new(exec),
- Partitioning::RoundRobinBatch(num_partitions),
- )?;
-
- // execute and collect results
- let mut output_partitions = vec![];
- for i in
0..exec.properties().output_partitioning().partition_count() {
- // execute this *output* partition and collect all batches
- let task_ctx = state.task_ctx();
- let mut stream = exec.execute(i, task_ctx)?;
- let mut batches = vec![];
- while let Some(result) = stream.next().await {
- batches.push(result?);
- }
- output_partitions.push(batches);
- }
-
- return MemTable::try_new(Arc::clone(&schema), output_partitions);
- }
- MemTable::try_new(Arc::clone(&schema), data)
- }
-}
-
-#[async_trait]
-impl TableProvider for MemTable {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn schema(&self) -> SchemaRef {
- Arc::clone(&self.schema)
- }
-
- fn constraints(&self) -> Option<&Constraints> {
- Some(&self.constraints)
- }
-
- fn table_type(&self) -> TableType {
- TableType::Base
- }
-
- async fn scan(
- &self,
- state: &dyn Session,
- projection: Option<&Vec<usize>>,
- _filters: &[Expr],
- _limit: Option<usize>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let mut partitions = vec![];
- for arc_inner_vec in self.batches.iter() {
- let inner_vec = arc_inner_vec.read().await;
- partitions.push(inner_vec.clone())
- }
-
- let mut source =
- MemorySourceConfig::try_new(&partitions, self.schema(),
projection.cloned())?;
-
- let show_sizes = state.config_options().explain.show_sizes;
- source = source.with_show_sizes(show_sizes);
-
- // add sort information if present
- let sort_order = self.sort_order.lock();
- if !sort_order.is_empty() {
- let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
-
- let file_sort_order = sort_order
- .iter()
- .map(|sort_exprs| {
- create_physical_sort_exprs(
- sort_exprs,
- &df_schema,
- state.execution_props(),
- )
- })
- .collect::<Result<Vec<_>>>()?;
- source = source.try_with_sort_information(file_sort_order)?;
- }
-
- Ok(DataSourceExec::from_data_source(source))
- }
-
- /// Returns an ExecutionPlan that inserts the execution results of a given
[`ExecutionPlan`] into this [`MemTable`].
- ///
- /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
- ///
- /// # Arguments
- ///
- /// * `state` - The [`SessionState`] containing the context for executing
the plan.
- /// * `input` - The [`ExecutionPlan`] to execute and insert.
- ///
- /// # Returns
- ///
- /// * A plan that returns the number of rows written.
- ///
- /// [`SessionState`]: crate::execution::context::SessionState
- async fn insert_into(
- &self,
- _state: &dyn Session,
- input: Arc<dyn ExecutionPlan>,
- insert_op: InsertOp,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- // If we are inserting into the table, any sort order may be messed up
so reset it here
- *self.sort_order.lock() = vec![];
-
- // Create a physical plan from the logical plan.
- // Check that the schema of the plan matches the schema of this table.
- self.schema()
- .logically_equivalent_names_and_types(&input.schema())?;
-
- if insert_op != InsertOp::Append {
- return not_impl_err!("{insert_op} not implemented for MemoryTable
yet");
- }
- let sink = MemSink::try_new(self.batches.clone(),
Arc::clone(&self.schema))?;
- Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
- }
-
- fn get_column_default(&self, column: &str) -> Option<&Expr> {
- self.column_defaults.get(column)
- }
-}
-
-/// Implements for writing to a [`MemTable`]
-struct MemSink {
- /// Target locations for writing data
- batches: Vec<PartitionData>,
- schema: SchemaRef,
-}
-
-impl Debug for MemSink {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("MemSink")
- .field("num_partitions", &self.batches.len())
- .finish()
- }
-}
-
-impl DisplayAs for MemSink {
- fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
- match t {
- DisplayFormatType::Default | DisplayFormatType::Verbose => {
- let partition_count = self.batches.len();
- write!(f, "MemoryTable (partitions={partition_count})")
- }
- DisplayFormatType::TreeRender => {
- // TODO: collect info
- write!(f, "")
- }
- }
- }
-}
-
-impl MemSink {
- /// Creates a new [`MemSink`].
- ///
- /// The caller is responsible for ensuring that there is at least one
partition to insert into.
- fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self>
{
- if batches.is_empty() {
- return plan_err!("Cannot insert into MemTable with zero
partitions");
- }
- Ok(Self { batches, schema })
- }
-}
-
-#[async_trait]
-impl DataSink for MemSink {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn schema(&self) -> &SchemaRef {
- &self.schema
- }
-
- async fn write_all(
- &self,
- mut data: SendableRecordBatchStream,
- _context: &Arc<TaskContext>,
- ) -> Result<u64> {
- let num_partitions = self.batches.len();
-
- // buffer up the data round robin style into num_partitions
-
- let mut new_batches = vec![vec![]; num_partitions];
- let mut i = 0;
- let mut row_count = 0;
- while let Some(batch) = data.next().await.transpose()? {
- row_count += batch.num_rows();
- new_batches[i].push(batch);
- i = (i + 1) % num_partitions;
- }
-
- // write the outputs into the batches
- for (target, mut batches) in
self.batches.iter().zip(new_batches.into_iter()) {
- // Append all the new batches in one go to minimize locking
overhead
- target.write().await.append(&mut batches);
- }
-
- Ok(row_count as u64)
- }
-}
-
#[cfg(test)]
mod tests {
- use super::*;
+ use crate::datasource::MemTable;
use crate::datasource::{provider_as_source, DefaultTableSource};
use crate::physical_plan::collect;
use crate::prelude::SessionContext;
-
use arrow::array::{AsArray, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, UInt64Type};
use arrow::error::ArrowError;
- use datafusion_common::DataFusionError;
+ use arrow::record_batch::RecordBatch;
+ use arrow_schema::SchemaRef;
+ use datafusion_catalog::TableProvider;
+ use datafusion_common::{DataFusionError, Result};
+ use datafusion_expr::dml::InsertOp;
use datafusion_expr::LogicalPlanBuilder;
+ use futures::StreamExt;
+ use std::collections::HashMap;
+ use std::sync::Arc;
#[tokio::test]
async fn test_with_projection() -> Result<()> {
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index 35a451cbc8..a195c9a882 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -24,10 +24,9 @@ pub mod empty;
pub mod file_format;
pub mod listing;
pub mod listing_table_factory;
-pub mod memory;
+mod memory_test;
pub mod physical_plan;
pub mod provider;
-mod statistics;
mod view_test;
// backwards compatibility
@@ -40,14 +39,15 @@ pub use crate::catalog::TableProvider;
pub use crate::logical_expr::TableType;
pub use datafusion_catalog::cte_worktable;
pub use datafusion_catalog::default_table_source;
+pub use datafusion_catalog::memory;
pub use datafusion_catalog::stream;
pub use datafusion_catalog::view;
+pub use datafusion_datasource::get_statistics_with_limit;
pub use datafusion_datasource::schema_adapter;
pub use datafusion_datasource::sink;
pub use datafusion_datasource::source;
pub use datafusion_execution::object_store;
pub use datafusion_physical_expr::create_ordering;
-pub use statistics::get_statistics_with_limit;
#[cfg(all(test, feature = "parquet"))]
mod tests {
diff --git a/datafusion/core/src/datasource/statistics.rs
b/datafusion/core/src/datasource/statistics.rs
deleted file mode 100644
index cf283ecee0..0000000000
--- a/datafusion/core/src/datasource/statistics.rs
+++ /dev/null
@@ -1,219 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::mem;
-use std::sync::Arc;
-
-use futures::{Stream, StreamExt};
-
-use crate::arrow::datatypes::SchemaRef;
-use crate::error::Result;
-use crate::physical_plan::{ColumnStatistics, Statistics};
-use datafusion_common::stats::Precision;
-use datafusion_common::ScalarValue;
-use datafusion_datasource::file_groups::FileGroup;
-
-use super::listing::PartitionedFile;
-
-/// Get all files as well as the file level summary statistics (no statistic
for partition columns).
-/// If the optional `limit` is provided, includes only sufficient files.
Needed to read up to
-/// `limit` number of rows. `collect_stats` is passed down from the
configuration parameter on
-/// `ListingTable`. If it is false we only construct bare statistics and skip
a potentially expensive
-/// call to `multiunzip` for constructing file level summary statistics.
-pub async fn get_statistics_with_limit(
- all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
- file_schema: SchemaRef,
- limit: Option<usize>,
- collect_stats: bool,
-) -> Result<(FileGroup, Statistics)> {
- let mut result_files = FileGroup::default();
- // These statistics can be calculated as long as at least one file provides
- // useful information. If none of the files provides any information, then
- // they will end up having `Precision::Absent` values. Throughout
calculations,
- // missing values will be imputed as:
- // - zero for summations, and
- // - neutral element for extreme points.
- let size = file_schema.fields().len();
- let mut col_stats_set = vec![ColumnStatistics::default(); size];
- let mut num_rows = Precision::<usize>::Absent;
- let mut total_byte_size = Precision::<usize>::Absent;
-
- // Fusing the stream allows us to call next safely even once it is
finished.
- let mut all_files = Box::pin(all_files.fuse());
-
- if let Some(first_file) = all_files.next().await {
- let (mut file, file_stats) = first_file?;
- file.statistics = Some(file_stats.as_ref().clone());
- result_files.push(file);
-
- // First file, we set them directly from the file statistics.
- num_rows = file_stats.num_rows;
- total_byte_size = file_stats.total_byte_size;
- for (index, file_column) in
- file_stats.column_statistics.clone().into_iter().enumerate()
- {
- col_stats_set[index].null_count = file_column.null_count;
- col_stats_set[index].max_value = file_column.max_value;
- col_stats_set[index].min_value = file_column.min_value;
- col_stats_set[index].sum_value = file_column.sum_value;
- }
-
- // If the number of rows exceeds the limit, we can stop processing
- // files. This only applies when we know the number of rows. It also
- // currently ignores tables that have no statistics regarding the
- // number of rows.
- let conservative_num_rows = match num_rows {
- Precision::Exact(nr) => nr,
- _ => usize::MIN,
- };
- if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
- while let Some(current) = all_files.next().await {
- let (mut file, file_stats) = current?;
- file.statistics = Some(file_stats.as_ref().clone());
- result_files.push(file);
- if !collect_stats {
- continue;
- }
-
- // We accumulate the number of rows, total byte size and null
- // counts across all the files in question. If any file does
not
- // provide any information or provides an inexact value, we
demote
- // the statistic precision to inexact.
- num_rows = add_row_stats(file_stats.num_rows, num_rows);
-
- total_byte_size =
- add_row_stats(file_stats.total_byte_size, total_byte_size);
-
- for (file_col_stats, col_stats) in file_stats
- .column_statistics
- .iter()
- .zip(col_stats_set.iter_mut())
- {
- let ColumnStatistics {
- null_count: file_nc,
- max_value: file_max,
- min_value: file_min,
- sum_value: file_sum,
- distinct_count: _,
- } = file_col_stats;
-
- col_stats.null_count = add_row_stats(*file_nc,
col_stats.null_count);
- set_max_if_greater(file_max, &mut col_stats.max_value);
- set_min_if_lesser(file_min, &mut col_stats.min_value);
- col_stats.sum_value = file_sum.add(&col_stats.sum_value);
- }
-
- // If the number of rows exceeds the limit, we can stop
processing
- // files. This only applies when we know the number of rows.
It also
- // currently ignores tables that have no statistics regarding
the
- // number of rows.
- if num_rows.get_value().unwrap_or(&usize::MIN)
- > &limit.unwrap_or(usize::MAX)
- {
- break;
- }
- }
- }
- };
-
- let mut statistics = Statistics {
- num_rows,
- total_byte_size,
- column_statistics: col_stats_set,
- };
- if all_files.next().await.is_some() {
- // If we still have files in the stream, it means that the limit kicked
- // in, and the statistic could have been different had we processed the
- // files in a different order.
- statistics = statistics.to_inexact()
- }
-
- Ok((result_files, statistics))
-}
-
-fn add_row_stats(
- file_num_rows: Precision<usize>,
- num_rows: Precision<usize>,
-) -> Precision<usize> {
- match (file_num_rows, &num_rows) {
- (Precision::Absent, _) => num_rows.to_inexact(),
- (lhs, Precision::Absent) => lhs.to_inexact(),
- (lhs, rhs) => lhs.add(rhs),
- }
-}
-
-/// If the given value is numerically greater than the original maximum value,
-/// return the new maximum value with appropriate exactness information.
-fn set_max_if_greater(
- max_nominee: &Precision<ScalarValue>,
- max_value: &mut Precision<ScalarValue>,
-) {
- match (&max_value, max_nominee) {
- (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
- *max_value = max_nominee.clone();
- }
- (Precision::Exact(val1), Precision::Inexact(val2))
- | (Precision::Inexact(val1), Precision::Inexact(val2))
- | (Precision::Inexact(val1), Precision::Exact(val2))
- if val1 < val2 =>
- {
- *max_value = max_nominee.clone().to_inexact();
- }
- (Precision::Exact(_), Precision::Absent) => {
- let exact_max = mem::take(max_value);
- *max_value = exact_max.to_inexact();
- }
- (Precision::Absent, Precision::Exact(_)) => {
- *max_value = max_nominee.clone().to_inexact();
- }
- (Precision::Absent, Precision::Inexact(_)) => {
- *max_value = max_nominee.clone();
- }
- _ => {}
- }
-}
-
-/// If the given value is numerically lesser than the original minimum value,
-/// return the new minimum value with appropriate exactness information.
-fn set_min_if_lesser(
- min_nominee: &Precision<ScalarValue>,
- min_value: &mut Precision<ScalarValue>,
-) {
- match (&min_value, min_nominee) {
- (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
- *min_value = min_nominee.clone();
- }
- (Precision::Exact(val1), Precision::Inexact(val2))
- | (Precision::Inexact(val1), Precision::Inexact(val2))
- | (Precision::Inexact(val1), Precision::Exact(val2))
- if val1 > val2 =>
- {
- *min_value = min_nominee.clone().to_inexact();
- }
- (Precision::Exact(_), Precision::Absent) => {
- let exact_min = mem::take(min_value);
- *min_value = exact_min.to_inexact();
- }
- (Precision::Absent, Precision::Exact(_)) => {
- *min_value = min_nominee.clone().to_inexact();
- }
- (Precision::Absent, Precision::Inexact(_)) => {
- *min_value = min_nominee.clone();
- }
- _ => {}
- }
-}
diff --git a/datafusion/datasource/src/memory.rs
b/datafusion/datasource/src/memory.rs
index f2e36672cd..6d0e16ef4b 100644
--- a/datafusion/datasource/src/memory.rs
+++ b/datafusion/datasource/src/memory.rs
@@ -19,9 +19,12 @@
use std::any::Any;
use std::fmt;
+use std::fmt::Debug;
use std::sync::Arc;
+use crate::sink::DataSink;
use crate::source::{DataSource, DataSourceExec};
+use async_trait::async_trait;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::memory::MemoryStream;
use datafusion_physical_plan::projection::{
@@ -42,6 +45,8 @@ use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
+use futures::StreamExt;
+use tokio::sync::RwLock;
/// Execution plan for reading in-memory batches of data
#[derive(Clone)]
@@ -62,7 +67,7 @@ pub struct MemoryExec {
}
#[allow(unused, deprecated)]
-impl fmt::Debug for MemoryExec {
+impl Debug for MemoryExec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt_as(DisplayFormatType::Default, f)
}
@@ -720,6 +725,91 @@ impl MemorySourceConfig {
}
}
+/// Type alias for partition data
+pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+/// Implements for writing to a [`MemTable`]
+///
+/// [`MemTable`]:
<https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html>
+pub struct MemSink {
+ /// Target locations for writing data
+ batches: Vec<PartitionData>,
+ schema: SchemaRef,
+}
+
+impl Debug for MemSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MemSink")
+ .field("num_partitions", &self.batches.len())
+ .finish()
+ }
+}
+
+impl DisplayAs for MemSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let partition_count = self.batches.len();
+ write!(f, "MemoryTable (partitions={partition_count})")
+ }
+ DisplayFormatType::TreeRender => {
+ // TODO: collect info
+ write!(f, "")
+ }
+ }
+ }
+}
+
+impl MemSink {
+ /// Creates a new [`MemSink`].
+ ///
+ /// The caller is responsible for ensuring that there is at least one
partition to insert into.
+ pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) ->
Result<Self> {
+ if batches.is_empty() {
+ return plan_err!("Cannot insert into MemTable with zero
partitions");
+ }
+ Ok(Self { batches, schema })
+ }
+}
+
+#[async_trait]
+impl DataSink for MemSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+
+ async fn write_all(
+ &self,
+ mut data: SendableRecordBatchStream,
+ _context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let num_partitions = self.batches.len();
+
+ // buffer up the data round robin style into num_partitions
+
+ let mut new_batches = vec![vec![]; num_partitions];
+ let mut i = 0;
+ let mut row_count = 0;
+ while let Some(batch) = data.next().await.transpose()? {
+ row_count += batch.num_rows();
+ new_batches[i].push(batch);
+ i = (i + 1) % num_partitions;
+ }
+
+ // write the outputs into the batches
+ for (target, mut batches) in
self.batches.iter().zip(new_batches.into_iter()) {
+ // Append all the new batches in one go to minimize locking
overhead
+ target.write().await.append(&mut batches);
+ }
+
+ Ok(row_count as u64)
+ }
+}
+
#[cfg(test)]
mod memory_source_tests {
use std::sync::Arc;
diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs
index fb119d1b3d..1e7ea1255d 100644
--- a/datafusion/datasource/src/mod.rs
+++ b/datafusion/datasource/src/mod.rs
@@ -60,6 +60,7 @@ use std::pin::Pin;
use std::sync::Arc;
pub use self::url::ListingTableUrl;
+pub use statistics::get_statistics_with_limit;
/// Stream of files get listed from object store
pub type PartitionedFileStream =
diff --git a/datafusion/datasource/src/statistics.rs
b/datafusion/datasource/src/statistics.rs
index cd002a9668..801315568a 100644
--- a/datafusion/datasource/src/statistics.rs
+++ b/datafusion/datasource/src/statistics.rs
@@ -20,8 +20,11 @@
//! Currently, this module houses code to sort file groups if they are
non-overlapping with
//! respect to the required sort order. See [`MinMaxStatistics`]
+use futures::{Stream, StreamExt};
+use std::mem;
use std::sync::Arc;
+use crate::file_groups::FileGroup;
use crate::PartitionedFile;
use arrow::array::RecordBatch;
@@ -30,9 +33,12 @@ use arrow::{
compute::SortColumn,
row::{Row, Rows},
};
+use datafusion_common::stats::Precision;
+use datafusion_common::ScalarValue;
use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError,
Result};
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_plan::{ColumnStatistics, Statistics};
/// A normalized representation of file min/max statistics that allows for
efficient sorting & comparison.
/// The min/max values are ordered by [`Self::sort_order`].
@@ -281,3 +287,192 @@ fn sort_columns_from_physical_sort_exprs(
.map(|expr| expr.expr.as_any().downcast_ref::<Column>())
.collect::<Option<Vec<_>>>()
}
+
+/// Get all files as well as the file level summary statistics (no statistic
for partition columns).
+/// If the optional `limit` is provided, includes only sufficient files.
Needed to read up to
+/// `limit` number of rows. `collect_stats` is passed down from the
configuration parameter on
+/// `ListingTable`. If it is false we only construct bare statistics and skip
a potentially expensive
+/// call to `multiunzip` for constructing file level summary statistics.
+pub async fn get_statistics_with_limit(
+ all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
+ file_schema: SchemaRef,
+ limit: Option<usize>,
+ collect_stats: bool,
+) -> Result<(FileGroup, Statistics)> {
+ let mut result_files = FileGroup::default();
+ // These statistics can be calculated as long as at least one file provides
+ // useful information. If none of the files provides any information, then
+ // they will end up having `Precision::Absent` values. Throughout
calculations,
+ // missing values will be imputed as:
+ // - zero for summations, and
+ // - neutral element for extreme points.
+ let size = file_schema.fields().len();
+ let mut col_stats_set = vec![ColumnStatistics::default(); size];
+ let mut num_rows = Precision::<usize>::Absent;
+ let mut total_byte_size = Precision::<usize>::Absent;
+
+ // Fusing the stream allows us to call next safely even once it is
finished.
+ let mut all_files = Box::pin(all_files.fuse());
+
+ if let Some(first_file) = all_files.next().await {
+ let (mut file, file_stats) = first_file?;
+ file.statistics = Some(file_stats.as_ref().clone());
+ result_files.push(file);
+
+ // First file, we set them directly from the file statistics.
+ num_rows = file_stats.num_rows;
+ total_byte_size = file_stats.total_byte_size;
+ for (index, file_column) in
+ file_stats.column_statistics.clone().into_iter().enumerate()
+ {
+ col_stats_set[index].null_count = file_column.null_count;
+ col_stats_set[index].max_value = file_column.max_value;
+ col_stats_set[index].min_value = file_column.min_value;
+ col_stats_set[index].sum_value = file_column.sum_value;
+ }
+
+ // If the number of rows exceeds the limit, we can stop processing
+ // files. This only applies when we know the number of rows. It also
+ // currently ignores tables that have no statistics regarding the
+ // number of rows.
+ let conservative_num_rows = match num_rows {
+ Precision::Exact(nr) => nr,
+ _ => usize::MIN,
+ };
+ if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
+ while let Some(current) = all_files.next().await {
+ let (mut file, file_stats) = current?;
+ file.statistics = Some(file_stats.as_ref().clone());
+ result_files.push(file);
+ if !collect_stats {
+ continue;
+ }
+
+ // We accumulate the number of rows, total byte size and null
+ // counts across all the files in question. If any file does
not
+ // provide any information or provides an inexact value, we
demote
+ // the statistic precision to inexact.
+ num_rows = add_row_stats(file_stats.num_rows, num_rows);
+
+ total_byte_size =
+ add_row_stats(file_stats.total_byte_size, total_byte_size);
+
+ for (file_col_stats, col_stats) in file_stats
+ .column_statistics
+ .iter()
+ .zip(col_stats_set.iter_mut())
+ {
+ let ColumnStatistics {
+ null_count: file_nc,
+ max_value: file_max,
+ min_value: file_min,
+ sum_value: file_sum,
+ distinct_count: _,
+ } = file_col_stats;
+
+ col_stats.null_count = add_row_stats(*file_nc,
col_stats.null_count);
+ set_max_if_greater(file_max, &mut col_stats.max_value);
+ set_min_if_lesser(file_min, &mut col_stats.min_value);
+ col_stats.sum_value = file_sum.add(&col_stats.sum_value);
+ }
+
+ // If the number of rows exceeds the limit, we can stop
processing
+ // files. This only applies when we know the number of rows.
It also
+ // currently ignores tables that have no statistics regarding
the
+ // number of rows.
+ if num_rows.get_value().unwrap_or(&usize::MIN)
+ > &limit.unwrap_or(usize::MAX)
+ {
+ break;
+ }
+ }
+ }
+ };
+
+ let mut statistics = Statistics {
+ num_rows,
+ total_byte_size,
+ column_statistics: col_stats_set,
+ };
+ if all_files.next().await.is_some() {
+ // If we still have files in the stream, it means that the limit kicked
+ // in, and the statistic could have been different had we processed the
+ // files in a different order.
+ statistics = statistics.to_inexact()
+ }
+
+ Ok((result_files, statistics))
+}
+
+fn add_row_stats(
+ file_num_rows: Precision<usize>,
+ num_rows: Precision<usize>,
+) -> Precision<usize> {
+ match (file_num_rows, &num_rows) {
+ (Precision::Absent, _) => num_rows.to_inexact(),
+ (lhs, Precision::Absent) => lhs.to_inexact(),
+ (lhs, rhs) => lhs.add(rhs),
+ }
+}
+
+/// If the given value is numerically greater than the original maximum value,
+/// return the new maximum value with appropriate exactness information.
+fn set_max_if_greater(
+ max_nominee: &Precision<ScalarValue>,
+ max_value: &mut Precision<ScalarValue>,
+) {
+ match (&max_value, max_nominee) {
+ (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
+ *max_value = max_nominee.clone();
+ }
+ (Precision::Exact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Exact(val2))
+ if val1 < val2 =>
+ {
+ *max_value = max_nominee.clone().to_inexact();
+ }
+ (Precision::Exact(_), Precision::Absent) => {
+ let exact_max = mem::take(max_value);
+ *max_value = exact_max.to_inexact();
+ }
+ (Precision::Absent, Precision::Exact(_)) => {
+ *max_value = max_nominee.clone().to_inexact();
+ }
+ (Precision::Absent, Precision::Inexact(_)) => {
+ *max_value = max_nominee.clone();
+ }
+ _ => {}
+ }
+}
+
+/// If the given value is numerically lesser than the original minimum value,
+/// return the new minimum value with appropriate exactness information.
+fn set_min_if_lesser(
+ min_nominee: &Precision<ScalarValue>,
+ min_value: &mut Precision<ScalarValue>,
+) {
+ match (&min_value, min_nominee) {
+ (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
+ *min_value = min_nominee.clone();
+ }
+ (Precision::Exact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Exact(val2))
+ if val1 > val2 =>
+ {
+ *min_value = min_nominee.clone().to_inexact();
+ }
+ (Precision::Exact(_), Precision::Absent) => {
+ let exact_min = mem::take(min_value);
+ *min_value = exact_min.to_inexact();
+ }
+ (Precision::Absent, Precision::Exact(_)) => {
+ *min_value = min_nominee.clone().to_inexact();
+ }
+ (Precision::Absent, Precision::Inexact(_)) => {
+ *min_value = min_nominee.clone();
+ }
+ _ => {}
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]