This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 97bd84acbb refactor: Move `Memtable` to catalog (#15459)
97bd84acbb is described below

commit 97bd84acbb96d17adee59f8e47b89db84fdf1d88
Author: logan-keede <[email protected]>
AuthorDate: Mon Mar 31 23:39:07 2025 +0530

    refactor: Move `Memtable` to catalog (#15459)
    
    * first iteration
    
    * fix: CI
    
    * move statistics
    
    * Merge relics
    
    * MemSink to datasource
    
    * clean stuff + backward compatibility
    
    * fix: cargo doc
    
    * fix:fmt
---
 datafusion/catalog/src/lib.rs                      |   2 +-
 datafusion/catalog/src/memory/mod.rs               |   6 +
 datafusion/catalog/src/memory/table.rs             | 296 ++++++++++++++++
 .../src/datasource/{memory.rs => memory_test.rs}   | 371 +--------------------
 datafusion/core/src/datasource/mod.rs              |   6 +-
 datafusion/core/src/datasource/statistics.rs       | 219 ------------
 datafusion/datasource/src/memory.rs                |  92 ++++-
 datafusion/datasource/src/mod.rs                   |   1 +
 datafusion/datasource/src/statistics.rs            | 195 +++++++++++
 9 files changed, 602 insertions(+), 586 deletions(-)

diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs
index f160bddd2b..0394b05277 100644
--- a/datafusion/catalog/src/lib.rs
+++ b/datafusion/catalog/src/lib.rs
@@ -50,7 +50,7 @@ pub use catalog::*;
 pub use datafusion_session::Session;
 pub use dynamic_file::catalog::*;
 pub use memory::{
-    MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
+    MemTable, MemoryCatalogProvider, MemoryCatalogProviderList, 
MemorySchemaProvider,
 };
 pub use r#async::*;
 pub use schema::*;
diff --git a/datafusion/catalog/src/memory/mod.rs 
b/datafusion/catalog/src/memory/mod.rs
index 4c5cf1a9ae..541d25b334 100644
--- a/datafusion/catalog/src/memory/mod.rs
+++ b/datafusion/catalog/src/memory/mod.rs
@@ -17,6 +17,12 @@
 
 pub(crate) mod catalog;
 pub(crate) mod schema;
+pub(crate) mod table;
 
 pub use catalog::*;
 pub use schema::*;
+pub use table::*;
+
+// backward compatibility
+pub use datafusion_datasource::memory::MemorySourceConfig;
+pub use datafusion_datasource::source::DataSourceExec;
diff --git a/datafusion/catalog/src/memory/table.rs 
b/datafusion/catalog/src/memory/table.rs
new file mode 100644
index 0000000000..81243e2c48
--- /dev/null
+++ b/datafusion/catalog/src/memory/table.rs
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use crate::TableProvider;
+use datafusion_common::error::Result;
+use datafusion_expr::Expr;
+use datafusion_expr::TableType;
+use datafusion_physical_expr::create_physical_sort_exprs;
+use datafusion_physical_plan::repartition::RepartitionExec;
+use datafusion_physical_plan::{
+    common, ExecutionPlan, ExecutionPlanProperties, Partitioning,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, 
SchemaExt};
+use datafusion_common_runtime::JoinSet;
+use datafusion_datasource::memory::MemSink;
+use datafusion_datasource::memory::MemorySourceConfig;
+use datafusion_datasource::sink::DataSinkExec;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_expr::dml::InsertOp;
+use datafusion_expr::SortExpr;
+use datafusion_session::Session;
+
+use async_trait::async_trait;
+use futures::StreamExt;
+use log::debug;
+use parking_lot::Mutex;
+use tokio::sync::RwLock;
+
+// backward compatibility
+pub use datafusion_datasource::memory::PartitionData;
+
+/// In-memory data source for presenting a `Vec<RecordBatch>` as a
+/// data source that can be queried by DataFusion. This allows data to
+/// be pre-loaded into memory and then repeatedly queried without
+/// incurring additional file I/O overhead.
+#[derive(Debug)]
+pub struct MemTable {
+    schema: SchemaRef,
+    // batches used to be pub(crate), but it's needed to be public for the 
tests
+    pub batches: Vec<PartitionData>,
+    constraints: Constraints,
+    column_defaults: HashMap<String, Expr>,
+    /// Optional pre-known sort order(s). Must be `SortExpr`s.
+    /// inserting data into this table removes the order
+    pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
+}
+
+impl MemTable {
+    /// Create a new in-memory table from the provided schema and record 
batches
+    pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> 
Result<Self> {
+        for batches in partitions.iter().flatten() {
+            let batches_schema = batches.schema();
+            if !schema.contains(&batches_schema) {
+                debug!(
+                    "mem table schema does not contain batches schema. \
+                        Target_schema: {schema:?}. Batches Schema: 
{batches_schema:?}"
+                );
+                return plan_err!("Mismatch between schema and batches");
+            }
+        }
+
+        Ok(Self {
+            schema,
+            batches: partitions
+                .into_iter()
+                .map(|e| Arc::new(RwLock::new(e)))
+                .collect::<Vec<_>>(),
+            constraints: Constraints::empty(),
+            column_defaults: HashMap::new(),
+            sort_order: Arc::new(Mutex::new(vec![])),
+        })
+    }
+
+    /// Assign constraints
+    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+        self.constraints = constraints;
+        self
+    }
+
+    /// Assign column defaults
+    pub fn with_column_defaults(
+        mut self,
+        column_defaults: HashMap<String, Expr>,
+    ) -> Self {
+        self.column_defaults = column_defaults;
+        self
+    }
+
+    /// Specify an optional pre-known sort order(s). Must be `SortExpr`s.
+    ///
+    /// If the data is not sorted by this order, DataFusion may produce
+    /// incorrect results.
+    ///
+    /// DataFusion may take advantage of this ordering to omit sorts
+    /// or use more efficient algorithms.
+    ///
+    /// Note that multiple sort orders are supported, if some are known to be
+    /// equivalent,
+    pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
+        std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
+        self
+    }
+
+    /// Create a mem table by reading from another data source
+    pub async fn load(
+        t: Arc<dyn TableProvider>,
+        output_partitions: Option<usize>,
+        state: &dyn Session,
+    ) -> Result<Self> {
+        let schema = t.schema();
+        let constraints = t.constraints();
+        let exec = t.scan(state, None, &[], None).await?;
+        let partition_count = exec.output_partitioning().partition_count();
+
+        let mut join_set = JoinSet::new();
+
+        for part_idx in 0..partition_count {
+            let task = state.task_ctx();
+            let exec = Arc::clone(&exec);
+            join_set.spawn(async move {
+                let stream = exec.execute(part_idx, task)?;
+                common::collect(stream).await
+            });
+        }
+
+        let mut data: Vec<Vec<RecordBatch>> =
+            Vec::with_capacity(exec.output_partitioning().partition_count());
+
+        while let Some(result) = join_set.join_next().await {
+            match result {
+                Ok(res) => data.push(res?),
+                Err(e) => {
+                    if e.is_panic() {
+                        std::panic::resume_unwind(e.into_panic());
+                    } else {
+                        unreachable!();
+                    }
+                }
+            }
+        }
+
+        let mut exec = 
DataSourceExec::new(Arc::new(MemorySourceConfig::try_new(
+            &data,
+            Arc::clone(&schema),
+            None,
+        )?));
+        if let Some(cons) = constraints {
+            exec = exec.with_constraints(cons.clone());
+        }
+
+        if let Some(num_partitions) = output_partitions {
+            let exec = RepartitionExec::try_new(
+                Arc::new(exec),
+                Partitioning::RoundRobinBatch(num_partitions),
+            )?;
+
+            // execute and collect results
+            let mut output_partitions = vec![];
+            for i in 
0..exec.properties().output_partitioning().partition_count() {
+                // execute this *output* partition and collect all batches
+                let task_ctx = state.task_ctx();
+                let mut stream = exec.execute(i, task_ctx)?;
+                let mut batches = vec![];
+                while let Some(result) = stream.next().await {
+                    batches.push(result?);
+                }
+                output_partitions.push(batches);
+            }
+
+            return MemTable::try_new(Arc::clone(&schema), output_partitions);
+        }
+        MemTable::try_new(Arc::clone(&schema), data)
+    }
+}
+
+#[async_trait]
+impl TableProvider for MemTable {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn constraints(&self) -> Option<&Constraints> {
+        Some(&self.constraints)
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let mut partitions = vec![];
+        for arc_inner_vec in self.batches.iter() {
+            let inner_vec = arc_inner_vec.read().await;
+            partitions.push(inner_vec.clone())
+        }
+
+        let mut source =
+            MemorySourceConfig::try_new(&partitions, self.schema(), 
projection.cloned())?;
+
+        let show_sizes = state.config_options().explain.show_sizes;
+        source = source.with_show_sizes(show_sizes);
+
+        // add sort information if present
+        let sort_order = self.sort_order.lock();
+        if !sort_order.is_empty() {
+            let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
+
+            let file_sort_order = sort_order
+                .iter()
+                .map(|sort_exprs| {
+                    create_physical_sort_exprs(
+                        sort_exprs,
+                        &df_schema,
+                        state.execution_props(),
+                    )
+                })
+                .collect::<Result<Vec<_>>>()?;
+            source = source.try_with_sort_information(file_sort_order)?;
+        }
+
+        Ok(DataSourceExec::from_data_source(source))
+    }
+
+    /// Returns an ExecutionPlan that inserts the execution results of a given 
[`ExecutionPlan`] into this [`MemTable`].
+    ///
+    /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
+    ///
+    /// # Arguments
+    ///
+    /// * `state` - The [`SessionState`] containing the context for executing 
the plan.
+    /// * `input` - The [`ExecutionPlan`] to execute and insert.
+    ///
+    /// # Returns
+    ///
+    /// * A plan that returns the number of rows written.
+    ///
+    /// [`SessionState`]: 
https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html
+    async fn insert_into(
+        &self,
+        _state: &dyn Session,
+        input: Arc<dyn ExecutionPlan>,
+        insert_op: InsertOp,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // If we are inserting into the table, any sort order may be messed up 
so reset it here
+        *self.sort_order.lock() = vec![];
+
+        // Create a physical plan from the logical plan.
+        // Check that the schema of the plan matches the schema of this table.
+        self.schema()
+            .logically_equivalent_names_and_types(&input.schema())?;
+
+        if insert_op != InsertOp::Append {
+            return not_impl_err!("{insert_op} not implemented for MemoryTable 
yet");
+        }
+        let sink = MemSink::try_new(self.batches.clone(), 
Arc::clone(&self.schema))?;
+        Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
+    }
+
+    fn get_column_default(&self, column: &str) -> Option<&Expr> {
+        self.column_defaults.get(column)
+    }
+}
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory_test.rs
similarity index 58%
rename from datafusion/core/src/datasource/memory.rs
rename to datafusion/core/src/datasource/memory_test.rs
index 0288cd3e8b..381000ab8e 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory_test.rs
@@ -15,378 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
-
-use std::any::Any;
-use std::collections::HashMap;
-use std::fmt::{self, Debug};
-use std::sync::Arc;
-
-use crate::datasource::{TableProvider, TableType};
-use crate::error::Result;
-use crate::logical_expr::Expr;
-use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::{
-    common, DisplayAs, DisplayFormatType, ExecutionPlan, 
ExecutionPlanProperties,
-    Partitioning, SendableRecordBatchStream,
-};
-use crate::physical_planner::create_physical_sort_exprs;
-
-use arrow::datatypes::SchemaRef;
-use arrow::record_batch::RecordBatch;
-use datafusion_catalog::Session;
-use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, 
SchemaExt};
-use datafusion_common_runtime::JoinSet;
-pub use datafusion_datasource::memory::MemorySourceConfig;
-use datafusion_datasource::sink::{DataSink, DataSinkExec};
-pub use datafusion_datasource::source::DataSourceExec;
-use datafusion_execution::TaskContext;
-use datafusion_expr::dml::InsertOp;
-use datafusion_expr::SortExpr;
-
-use async_trait::async_trait;
-use futures::StreamExt;
-use log::debug;
-use parking_lot::Mutex;
-use tokio::sync::RwLock;
-
-/// Type alias for partition data
-pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
-
-/// In-memory data source for presenting a `Vec<RecordBatch>` as a
-/// data source that can be queried by DataFusion. This allows data to
-/// be pre-loaded into memory and then repeatedly queried without
-/// incurring additional file I/O overhead.
-#[derive(Debug)]
-pub struct MemTable {
-    schema: SchemaRef,
-    pub(crate) batches: Vec<PartitionData>,
-    constraints: Constraints,
-    column_defaults: HashMap<String, Expr>,
-    /// Optional pre-known sort order(s). Must be `SortExpr`s.
-    /// inserting data into this table removes the order
-    pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
-}
-
-impl MemTable {
-    /// Create a new in-memory table from the provided schema and record 
batches
-    pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> 
Result<Self> {
-        for batches in partitions.iter().flatten() {
-            let batches_schema = batches.schema();
-            if !schema.contains(&batches_schema) {
-                debug!(
-                    "mem table schema does not contain batches schema. \
-                        Target_schema: {schema:?}. Batches Schema: 
{batches_schema:?}"
-                );
-                return plan_err!("Mismatch between schema and batches");
-            }
-        }
-
-        Ok(Self {
-            schema,
-            batches: partitions
-                .into_iter()
-                .map(|e| Arc::new(RwLock::new(e)))
-                .collect::<Vec<_>>(),
-            constraints: Constraints::empty(),
-            column_defaults: HashMap::new(),
-            sort_order: Arc::new(Mutex::new(vec![])),
-        })
-    }
-
-    /// Assign constraints
-    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
-        self.constraints = constraints;
-        self
-    }
-
-    /// Assign column defaults
-    pub fn with_column_defaults(
-        mut self,
-        column_defaults: HashMap<String, Expr>,
-    ) -> Self {
-        self.column_defaults = column_defaults;
-        self
-    }
-
-    /// Specify an optional pre-known sort order(s). Must be `SortExpr`s.
-    ///
-    /// If the data is not sorted by this order, DataFusion may produce
-    /// incorrect results.
-    ///
-    /// DataFusion may take advantage of this ordering to omit sorts
-    /// or use more efficient algorithms.
-    ///
-    /// Note that multiple sort orders are supported, if some are known to be
-    /// equivalent,
-    pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
-        std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
-        self
-    }
-
-    /// Create a mem table by reading from another data source
-    pub async fn load(
-        t: Arc<dyn TableProvider>,
-        output_partitions: Option<usize>,
-        state: &dyn Session,
-    ) -> Result<Self> {
-        let schema = t.schema();
-        let constraints = t.constraints();
-        let exec = t.scan(state, None, &[], None).await?;
-        let partition_count = exec.output_partitioning().partition_count();
-
-        let mut join_set = JoinSet::new();
-
-        for part_idx in 0..partition_count {
-            let task = state.task_ctx();
-            let exec = Arc::clone(&exec);
-            join_set.spawn(async move {
-                let stream = exec.execute(part_idx, task)?;
-                common::collect(stream).await
-            });
-        }
-
-        let mut data: Vec<Vec<RecordBatch>> =
-            Vec::with_capacity(exec.output_partitioning().partition_count());
-
-        while let Some(result) = join_set.join_next().await {
-            match result {
-                Ok(res) => data.push(res?),
-                Err(e) => {
-                    if e.is_panic() {
-                        std::panic::resume_unwind(e.into_panic());
-                    } else {
-                        unreachable!();
-                    }
-                }
-            }
-        }
-
-        let mut exec = 
DataSourceExec::new(Arc::new(MemorySourceConfig::try_new(
-            &data,
-            Arc::clone(&schema),
-            None,
-        )?));
-        if let Some(cons) = constraints {
-            exec = exec.with_constraints(cons.clone());
-        }
-
-        if let Some(num_partitions) = output_partitions {
-            let exec = RepartitionExec::try_new(
-                Arc::new(exec),
-                Partitioning::RoundRobinBatch(num_partitions),
-            )?;
-
-            // execute and collect results
-            let mut output_partitions = vec![];
-            for i in 
0..exec.properties().output_partitioning().partition_count() {
-                // execute this *output* partition and collect all batches
-                let task_ctx = state.task_ctx();
-                let mut stream = exec.execute(i, task_ctx)?;
-                let mut batches = vec![];
-                while let Some(result) = stream.next().await {
-                    batches.push(result?);
-                }
-                output_partitions.push(batches);
-            }
-
-            return MemTable::try_new(Arc::clone(&schema), output_partitions);
-        }
-        MemTable::try_new(Arc::clone(&schema), data)
-    }
-}
-
-#[async_trait]
-impl TableProvider for MemTable {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.schema)
-    }
-
-    fn constraints(&self) -> Option<&Constraints> {
-        Some(&self.constraints)
-    }
-
-    fn table_type(&self) -> TableType {
-        TableType::Base
-    }
-
-    async fn scan(
-        &self,
-        state: &dyn Session,
-        projection: Option<&Vec<usize>>,
-        _filters: &[Expr],
-        _limit: Option<usize>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let mut partitions = vec![];
-        for arc_inner_vec in self.batches.iter() {
-            let inner_vec = arc_inner_vec.read().await;
-            partitions.push(inner_vec.clone())
-        }
-
-        let mut source =
-            MemorySourceConfig::try_new(&partitions, self.schema(), 
projection.cloned())?;
-
-        let show_sizes = state.config_options().explain.show_sizes;
-        source = source.with_show_sizes(show_sizes);
-
-        // add sort information if present
-        let sort_order = self.sort_order.lock();
-        if !sort_order.is_empty() {
-            let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
-
-            let file_sort_order = sort_order
-                .iter()
-                .map(|sort_exprs| {
-                    create_physical_sort_exprs(
-                        sort_exprs,
-                        &df_schema,
-                        state.execution_props(),
-                    )
-                })
-                .collect::<Result<Vec<_>>>()?;
-            source = source.try_with_sort_information(file_sort_order)?;
-        }
-
-        Ok(DataSourceExec::from_data_source(source))
-    }
-
-    /// Returns an ExecutionPlan that inserts the execution results of a given 
[`ExecutionPlan`] into this [`MemTable`].
-    ///
-    /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
-    ///
-    /// # Arguments
-    ///
-    /// * `state` - The [`SessionState`] containing the context for executing 
the plan.
-    /// * `input` - The [`ExecutionPlan`] to execute and insert.
-    ///
-    /// # Returns
-    ///
-    /// * A plan that returns the number of rows written.
-    ///
-    /// [`SessionState`]: crate::execution::context::SessionState
-    async fn insert_into(
-        &self,
-        _state: &dyn Session,
-        input: Arc<dyn ExecutionPlan>,
-        insert_op: InsertOp,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        // If we are inserting into the table, any sort order may be messed up 
so reset it here
-        *self.sort_order.lock() = vec![];
-
-        // Create a physical plan from the logical plan.
-        // Check that the schema of the plan matches the schema of this table.
-        self.schema()
-            .logically_equivalent_names_and_types(&input.schema())?;
-
-        if insert_op != InsertOp::Append {
-            return not_impl_err!("{insert_op} not implemented for MemoryTable 
yet");
-        }
-        let sink = MemSink::try_new(self.batches.clone(), 
Arc::clone(&self.schema))?;
-        Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
-    }
-
-    fn get_column_default(&self, column: &str) -> Option<&Expr> {
-        self.column_defaults.get(column)
-    }
-}
-
-/// Implements for writing to a [`MemTable`]
-struct MemSink {
-    /// Target locations for writing data
-    batches: Vec<PartitionData>,
-    schema: SchemaRef,
-}
-
-impl Debug for MemSink {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("MemSink")
-            .field("num_partitions", &self.batches.len())
-            .finish()
-    }
-}
-
-impl DisplayAs for MemSink {
-    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
-        match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                let partition_count = self.batches.len();
-                write!(f, "MemoryTable (partitions={partition_count})")
-            }
-            DisplayFormatType::TreeRender => {
-                // TODO: collect info
-                write!(f, "")
-            }
-        }
-    }
-}
-
-impl MemSink {
-    /// Creates a new [`MemSink`].
-    ///
-    /// The caller is responsible for ensuring that there is at least one 
partition to insert into.
-    fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> Result<Self> 
{
-        if batches.is_empty() {
-            return plan_err!("Cannot insert into MemTable with zero 
partitions");
-        }
-        Ok(Self { batches, schema })
-    }
-}
-
-#[async_trait]
-impl DataSink for MemSink {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> &SchemaRef {
-        &self.schema
-    }
-
-    async fn write_all(
-        &self,
-        mut data: SendableRecordBatchStream,
-        _context: &Arc<TaskContext>,
-    ) -> Result<u64> {
-        let num_partitions = self.batches.len();
-
-        // buffer up the data round robin style into num_partitions
-
-        let mut new_batches = vec![vec![]; num_partitions];
-        let mut i = 0;
-        let mut row_count = 0;
-        while let Some(batch) = data.next().await.transpose()? {
-            row_count += batch.num_rows();
-            new_batches[i].push(batch);
-            i = (i + 1) % num_partitions;
-        }
-
-        // write the outputs into the batches
-        for (target, mut batches) in 
self.batches.iter().zip(new_batches.into_iter()) {
-            // Append all the new batches in one go to minimize locking 
overhead
-            target.write().await.append(&mut batches);
-        }
-
-        Ok(row_count as u64)
-    }
-}
-
 #[cfg(test)]
 mod tests {
 
-    use super::*;
+    use crate::datasource::MemTable;
     use crate::datasource::{provider_as_source, DefaultTableSource};
     use crate::physical_plan::collect;
     use crate::prelude::SessionContext;
-
     use arrow::array::{AsArray, Int32Array};
     use arrow::datatypes::{DataType, Field, Schema, UInt64Type};
     use arrow::error::ArrowError;
-    use datafusion_common::DataFusionError;
+    use arrow::record_batch::RecordBatch;
+    use arrow_schema::SchemaRef;
+    use datafusion_catalog::TableProvider;
+    use datafusion_common::{DataFusionError, Result};
+    use datafusion_expr::dml::InsertOp;
     use datafusion_expr::LogicalPlanBuilder;
+    use futures::StreamExt;
+    use std::collections::HashMap;
+    use std::sync::Arc;
 
     #[tokio::test]
     async fn test_with_projection() -> Result<()> {
diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index 35a451cbc8..a195c9a882 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -24,10 +24,9 @@ pub mod empty;
 pub mod file_format;
 pub mod listing;
 pub mod listing_table_factory;
-pub mod memory;
+mod memory_test;
 pub mod physical_plan;
 pub mod provider;
-mod statistics;
 mod view_test;
 
 // backwards compatibility
@@ -40,14 +39,15 @@ pub use crate::catalog::TableProvider;
 pub use crate::logical_expr::TableType;
 pub use datafusion_catalog::cte_worktable;
 pub use datafusion_catalog::default_table_source;
+pub use datafusion_catalog::memory;
 pub use datafusion_catalog::stream;
 pub use datafusion_catalog::view;
+pub use datafusion_datasource::get_statistics_with_limit;
 pub use datafusion_datasource::schema_adapter;
 pub use datafusion_datasource::sink;
 pub use datafusion_datasource::source;
 pub use datafusion_execution::object_store;
 pub use datafusion_physical_expr::create_ordering;
-pub use statistics::get_statistics_with_limit;
 
 #[cfg(all(test, feature = "parquet"))]
 mod tests {
diff --git a/datafusion/core/src/datasource/statistics.rs 
b/datafusion/core/src/datasource/statistics.rs
deleted file mode 100644
index cf283ecee0..0000000000
--- a/datafusion/core/src/datasource/statistics.rs
+++ /dev/null
@@ -1,219 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::mem;
-use std::sync::Arc;
-
-use futures::{Stream, StreamExt};
-
-use crate::arrow::datatypes::SchemaRef;
-use crate::error::Result;
-use crate::physical_plan::{ColumnStatistics, Statistics};
-use datafusion_common::stats::Precision;
-use datafusion_common::ScalarValue;
-use datafusion_datasource::file_groups::FileGroup;
-
-use super::listing::PartitionedFile;
-
-/// Get all files as well as the file level summary statistics (no statistic 
for partition columns).
-/// If the optional `limit` is provided, includes only sufficient files. 
Needed to read up to
-/// `limit` number of rows. `collect_stats` is passed down from the 
configuration parameter on
-/// `ListingTable`. If it is false we only construct bare statistics and skip 
a potentially expensive
-///  call to `multiunzip` for constructing file level summary statistics.
-pub async fn get_statistics_with_limit(
-    all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
-    file_schema: SchemaRef,
-    limit: Option<usize>,
-    collect_stats: bool,
-) -> Result<(FileGroup, Statistics)> {
-    let mut result_files = FileGroup::default();
-    // These statistics can be calculated as long as at least one file provides
-    // useful information. If none of the files provides any information, then
-    // they will end up having `Precision::Absent` values. Throughout 
calculations,
-    // missing values will be imputed as:
-    // - zero for summations, and
-    // - neutral element for extreme points.
-    let size = file_schema.fields().len();
-    let mut col_stats_set = vec![ColumnStatistics::default(); size];
-    let mut num_rows = Precision::<usize>::Absent;
-    let mut total_byte_size = Precision::<usize>::Absent;
-
-    // Fusing the stream allows us to call next safely even once it is 
finished.
-    let mut all_files = Box::pin(all_files.fuse());
-
-    if let Some(first_file) = all_files.next().await {
-        let (mut file, file_stats) = first_file?;
-        file.statistics = Some(file_stats.as_ref().clone());
-        result_files.push(file);
-
-        // First file, we set them directly from the file statistics.
-        num_rows = file_stats.num_rows;
-        total_byte_size = file_stats.total_byte_size;
-        for (index, file_column) in
-            file_stats.column_statistics.clone().into_iter().enumerate()
-        {
-            col_stats_set[index].null_count = file_column.null_count;
-            col_stats_set[index].max_value = file_column.max_value;
-            col_stats_set[index].min_value = file_column.min_value;
-            col_stats_set[index].sum_value = file_column.sum_value;
-        }
-
-        // If the number of rows exceeds the limit, we can stop processing
-        // files. This only applies when we know the number of rows. It also
-        // currently ignores tables that have no statistics regarding the
-        // number of rows.
-        let conservative_num_rows = match num_rows {
-            Precision::Exact(nr) => nr,
-            _ => usize::MIN,
-        };
-        if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
-            while let Some(current) = all_files.next().await {
-                let (mut file, file_stats) = current?;
-                file.statistics = Some(file_stats.as_ref().clone());
-                result_files.push(file);
-                if !collect_stats {
-                    continue;
-                }
-
-                // We accumulate the number of rows, total byte size and null
-                // counts across all the files in question. If any file does 
not
-                // provide any information or provides an inexact value, we 
demote
-                // the statistic precision to inexact.
-                num_rows = add_row_stats(file_stats.num_rows, num_rows);
-
-                total_byte_size =
-                    add_row_stats(file_stats.total_byte_size, total_byte_size);
-
-                for (file_col_stats, col_stats) in file_stats
-                    .column_statistics
-                    .iter()
-                    .zip(col_stats_set.iter_mut())
-                {
-                    let ColumnStatistics {
-                        null_count: file_nc,
-                        max_value: file_max,
-                        min_value: file_min,
-                        sum_value: file_sum,
-                        distinct_count: _,
-                    } = file_col_stats;
-
-                    col_stats.null_count = add_row_stats(*file_nc, 
col_stats.null_count);
-                    set_max_if_greater(file_max, &mut col_stats.max_value);
-                    set_min_if_lesser(file_min, &mut col_stats.min_value);
-                    col_stats.sum_value = file_sum.add(&col_stats.sum_value);
-                }
-
-                // If the number of rows exceeds the limit, we can stop 
processing
-                // files. This only applies when we know the number of rows. 
It also
-                // currently ignores tables that have no statistics regarding 
the
-                // number of rows.
-                if num_rows.get_value().unwrap_or(&usize::MIN)
-                    > &limit.unwrap_or(usize::MAX)
-                {
-                    break;
-                }
-            }
-        }
-    };
-
-    let mut statistics = Statistics {
-        num_rows,
-        total_byte_size,
-        column_statistics: col_stats_set,
-    };
-    if all_files.next().await.is_some() {
-        // If we still have files in the stream, it means that the limit kicked
-        // in, and the statistic could have been different had we processed the
-        // files in a different order.
-        statistics = statistics.to_inexact()
-    }
-
-    Ok((result_files, statistics))
-}
-
-fn add_row_stats(
-    file_num_rows: Precision<usize>,
-    num_rows: Precision<usize>,
-) -> Precision<usize> {
-    match (file_num_rows, &num_rows) {
-        (Precision::Absent, _) => num_rows.to_inexact(),
-        (lhs, Precision::Absent) => lhs.to_inexact(),
-        (lhs, rhs) => lhs.add(rhs),
-    }
-}
-
-/// If the given value is numerically greater than the original maximum value,
-/// return the new maximum value with appropriate exactness information.
-fn set_max_if_greater(
-    max_nominee: &Precision<ScalarValue>,
-    max_value: &mut Precision<ScalarValue>,
-) {
-    match (&max_value, max_nominee) {
-        (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
-            *max_value = max_nominee.clone();
-        }
-        (Precision::Exact(val1), Precision::Inexact(val2))
-        | (Precision::Inexact(val1), Precision::Inexact(val2))
-        | (Precision::Inexact(val1), Precision::Exact(val2))
-            if val1 < val2 =>
-        {
-            *max_value = max_nominee.clone().to_inexact();
-        }
-        (Precision::Exact(_), Precision::Absent) => {
-            let exact_max = mem::take(max_value);
-            *max_value = exact_max.to_inexact();
-        }
-        (Precision::Absent, Precision::Exact(_)) => {
-            *max_value = max_nominee.clone().to_inexact();
-        }
-        (Precision::Absent, Precision::Inexact(_)) => {
-            *max_value = max_nominee.clone();
-        }
-        _ => {}
-    }
-}
-
-/// If the given value is numerically lesser than the original minimum value,
-/// return the new minimum value with appropriate exactness information.
-fn set_min_if_lesser(
-    min_nominee: &Precision<ScalarValue>,
-    min_value: &mut Precision<ScalarValue>,
-) {
-    match (&min_value, min_nominee) {
-        (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
-            *min_value = min_nominee.clone();
-        }
-        (Precision::Exact(val1), Precision::Inexact(val2))
-        | (Precision::Inexact(val1), Precision::Inexact(val2))
-        | (Precision::Inexact(val1), Precision::Exact(val2))
-            if val1 > val2 =>
-        {
-            *min_value = min_nominee.clone().to_inexact();
-        }
-        (Precision::Exact(_), Precision::Absent) => {
-            let exact_min = mem::take(min_value);
-            *min_value = exact_min.to_inexact();
-        }
-        (Precision::Absent, Precision::Exact(_)) => {
-            *min_value = min_nominee.clone().to_inexact();
-        }
-        (Precision::Absent, Precision::Inexact(_)) => {
-            *min_value = min_nominee.clone();
-        }
-        _ => {}
-    }
-}
diff --git a/datafusion/datasource/src/memory.rs 
b/datafusion/datasource/src/memory.rs
index f2e36672cd..6d0e16ef4b 100644
--- a/datafusion/datasource/src/memory.rs
+++ b/datafusion/datasource/src/memory.rs
@@ -19,9 +19,12 @@
 
 use std::any::Any;
 use std::fmt;
+use std::fmt::Debug;
 use std::sync::Arc;
 
+use crate::sink::DataSink;
 use crate::source::{DataSource, DataSourceExec};
+use async_trait::async_trait;
 use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion_physical_plan::memory::MemoryStream;
 use datafusion_physical_plan::projection::{
@@ -42,6 +45,8 @@ use datafusion_physical_expr::equivalence::ProjectionMapping;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
+use futures::StreamExt;
+use tokio::sync::RwLock;
 
 /// Execution plan for reading in-memory batches of data
 #[derive(Clone)]
@@ -62,7 +67,7 @@ pub struct MemoryExec {
 }
 
 #[allow(unused, deprecated)]
-impl fmt::Debug for MemoryExec {
+impl Debug for MemoryExec {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         self.inner.fmt_as(DisplayFormatType::Default, f)
     }
@@ -720,6 +725,91 @@ impl MemorySourceConfig {
     }
 }
 
+/// Type alias for partition data
+pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+/// Implements for writing to a [`MemTable`]
+///
+/// [`MemTable`]: 
<https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html>
+pub struct MemSink {
+    /// Target locations for writing data
+    batches: Vec<PartitionData>,
+    schema: SchemaRef,
+}
+
+impl Debug for MemSink {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("MemSink")
+            .field("num_partitions", &self.batches.len())
+            .finish()
+    }
+}
+
+impl DisplayAs for MemSink {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let partition_count = self.batches.len();
+                write!(f, "MemoryTable (partitions={partition_count})")
+            }
+            DisplayFormatType::TreeRender => {
+                // TODO: collect info
+                write!(f, "")
+            }
+        }
+    }
+}
+
+impl MemSink {
+    /// Creates a new [`MemSink`].
+    ///
+    /// The caller is responsible for ensuring that there is at least one 
partition to insert into.
+    pub fn try_new(batches: Vec<PartitionData>, schema: SchemaRef) -> 
Result<Self> {
+        if batches.is_empty() {
+            return plan_err!("Cannot insert into MemTable with zero 
partitions");
+        }
+        Ok(Self { batches, schema })
+    }
+}
+
+#[async_trait]
+impl DataSink for MemSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    async fn write_all(
+        &self,
+        mut data: SendableRecordBatchStream,
+        _context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        let num_partitions = self.batches.len();
+
+        // buffer up the data round robin style into num_partitions
+
+        let mut new_batches = vec![vec![]; num_partitions];
+        let mut i = 0;
+        let mut row_count = 0;
+        while let Some(batch) = data.next().await.transpose()? {
+            row_count += batch.num_rows();
+            new_batches[i].push(batch);
+            i = (i + 1) % num_partitions;
+        }
+
+        // write the outputs into the batches
+        for (target, mut batches) in 
self.batches.iter().zip(new_batches.into_iter()) {
+            // Append all the new batches in one go to minimize locking 
overhead
+            target.write().await.append(&mut batches);
+        }
+
+        Ok(row_count as u64)
+    }
+}
+
 #[cfg(test)]
 mod memory_source_tests {
     use std::sync::Arc;
diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs
index fb119d1b3d..1e7ea1255d 100644
--- a/datafusion/datasource/src/mod.rs
+++ b/datafusion/datasource/src/mod.rs
@@ -60,6 +60,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 
 pub use self::url::ListingTableUrl;
+pub use statistics::get_statistics_with_limit;
 
 /// Stream of files get listed from object store
 pub type PartitionedFileStream =
diff --git a/datafusion/datasource/src/statistics.rs 
b/datafusion/datasource/src/statistics.rs
index cd002a9668..801315568a 100644
--- a/datafusion/datasource/src/statistics.rs
+++ b/datafusion/datasource/src/statistics.rs
@@ -20,8 +20,11 @@
 //! Currently, this module houses code to sort file groups if they are 
non-overlapping with
 //! respect to the required sort order. See [`MinMaxStatistics`]
 
+use futures::{Stream, StreamExt};
+use std::mem;
 use std::sync::Arc;
 
+use crate::file_groups::FileGroup;
 use crate::PartitionedFile;
 
 use arrow::array::RecordBatch;
@@ -30,9 +33,12 @@ use arrow::{
     compute::SortColumn,
     row::{Row, Rows},
 };
+use datafusion_common::stats::Precision;
+use datafusion_common::ScalarValue;
 use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError, 
Result};
 use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_plan::{ColumnStatistics, Statistics};
 
 /// A normalized representation of file min/max statistics that allows for 
efficient sorting & comparison.
 /// The min/max values are ordered by [`Self::sort_order`].
@@ -281,3 +287,192 @@ fn sort_columns_from_physical_sort_exprs(
         .map(|expr| expr.expr.as_any().downcast_ref::<Column>())
         .collect::<Option<Vec<_>>>()
 }
+
+/// Get all files as well as the file level summary statistics (no statistic 
for partition columns).
+/// If the optional `limit` is provided, includes only sufficient files. 
Needed to read up to
+/// `limit` number of rows. `collect_stats` is passed down from the 
configuration parameter on
+/// `ListingTable`. If it is false we only construct bare statistics and skip 
a potentially expensive
+///  call to `multiunzip` for constructing file level summary statistics.
+pub async fn get_statistics_with_limit(
+    all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
+    file_schema: SchemaRef,
+    limit: Option<usize>,
+    collect_stats: bool,
+) -> Result<(FileGroup, Statistics)> {
+    let mut result_files = FileGroup::default();
+    // These statistics can be calculated as long as at least one file provides
+    // useful information. If none of the files provides any information, then
+    // they will end up having `Precision::Absent` values. Throughout 
calculations,
+    // missing values will be imputed as:
+    // - zero for summations, and
+    // - neutral element for extreme points.
+    let size = file_schema.fields().len();
+    let mut col_stats_set = vec![ColumnStatistics::default(); size];
+    let mut num_rows = Precision::<usize>::Absent;
+    let mut total_byte_size = Precision::<usize>::Absent;
+
+    // Fusing the stream allows us to call next safely even once it is 
finished.
+    let mut all_files = Box::pin(all_files.fuse());
+
+    if let Some(first_file) = all_files.next().await {
+        let (mut file, file_stats) = first_file?;
+        file.statistics = Some(file_stats.as_ref().clone());
+        result_files.push(file);
+
+        // First file, we set them directly from the file statistics.
+        num_rows = file_stats.num_rows;
+        total_byte_size = file_stats.total_byte_size;
+        for (index, file_column) in
+            file_stats.column_statistics.clone().into_iter().enumerate()
+        {
+            col_stats_set[index].null_count = file_column.null_count;
+            col_stats_set[index].max_value = file_column.max_value;
+            col_stats_set[index].min_value = file_column.min_value;
+            col_stats_set[index].sum_value = file_column.sum_value;
+        }
+
+        // If the number of rows exceeds the limit, we can stop processing
+        // files. This only applies when we know the number of rows. It also
+        // currently ignores tables that have no statistics regarding the
+        // number of rows.
+        let conservative_num_rows = match num_rows {
+            Precision::Exact(nr) => nr,
+            _ => usize::MIN,
+        };
+        if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
+            while let Some(current) = all_files.next().await {
+                let (mut file, file_stats) = current?;
+                file.statistics = Some(file_stats.as_ref().clone());
+                result_files.push(file);
+                if !collect_stats {
+                    continue;
+                }
+
+                // We accumulate the number of rows, total byte size and null
+                // counts across all the files in question. If any file does 
not
+                // provide any information or provides an inexact value, we 
demote
+                // the statistic precision to inexact.
+                num_rows = add_row_stats(file_stats.num_rows, num_rows);
+
+                total_byte_size =
+                    add_row_stats(file_stats.total_byte_size, total_byte_size);
+
+                for (file_col_stats, col_stats) in file_stats
+                    .column_statistics
+                    .iter()
+                    .zip(col_stats_set.iter_mut())
+                {
+                    let ColumnStatistics {
+                        null_count: file_nc,
+                        max_value: file_max,
+                        min_value: file_min,
+                        sum_value: file_sum,
+                        distinct_count: _,
+                    } = file_col_stats;
+
+                    col_stats.null_count = add_row_stats(*file_nc, 
col_stats.null_count);
+                    set_max_if_greater(file_max, &mut col_stats.max_value);
+                    set_min_if_lesser(file_min, &mut col_stats.min_value);
+                    col_stats.sum_value = file_sum.add(&col_stats.sum_value);
+                }
+
+                // If the number of rows exceeds the limit, we can stop 
processing
+                // files. This only applies when we know the number of rows. 
It also
+                // currently ignores tables that have no statistics regarding 
the
+                // number of rows.
+                if num_rows.get_value().unwrap_or(&usize::MIN)
+                    > &limit.unwrap_or(usize::MAX)
+                {
+                    break;
+                }
+            }
+        }
+    };
+
+    let mut statistics = Statistics {
+        num_rows,
+        total_byte_size,
+        column_statistics: col_stats_set,
+    };
+    if all_files.next().await.is_some() {
+        // If we still have files in the stream, it means that the limit kicked
+        // in, and the statistic could have been different had we processed the
+        // files in a different order.
+        statistics = statistics.to_inexact()
+    }
+
+    Ok((result_files, statistics))
+}
+
+fn add_row_stats(
+    file_num_rows: Precision<usize>,
+    num_rows: Precision<usize>,
+) -> Precision<usize> {
+    match (file_num_rows, &num_rows) {
+        (Precision::Absent, _) => num_rows.to_inexact(),
+        (lhs, Precision::Absent) => lhs.to_inexact(),
+        (lhs, rhs) => lhs.add(rhs),
+    }
+}
+
+/// If the given value is numerically greater than the original maximum value,
+/// return the new maximum value with appropriate exactness information.
+fn set_max_if_greater(
+    max_nominee: &Precision<ScalarValue>,
+    max_value: &mut Precision<ScalarValue>,
+) {
+    match (&max_value, max_nominee) {
+        (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
+            *max_value = max_nominee.clone();
+        }
+        (Precision::Exact(val1), Precision::Inexact(val2))
+        | (Precision::Inexact(val1), Precision::Inexact(val2))
+        | (Precision::Inexact(val1), Precision::Exact(val2))
+            if val1 < val2 =>
+        {
+            *max_value = max_nominee.clone().to_inexact();
+        }
+        (Precision::Exact(_), Precision::Absent) => {
+            let exact_max = mem::take(max_value);
+            *max_value = exact_max.to_inexact();
+        }
+        (Precision::Absent, Precision::Exact(_)) => {
+            *max_value = max_nominee.clone().to_inexact();
+        }
+        (Precision::Absent, Precision::Inexact(_)) => {
+            *max_value = max_nominee.clone();
+        }
+        _ => {}
+    }
+}
+
+/// If the given value is numerically lesser than the original minimum value,
+/// return the new minimum value with appropriate exactness information.
+fn set_min_if_lesser(
+    min_nominee: &Precision<ScalarValue>,
+    min_value: &mut Precision<ScalarValue>,
+) {
+    match (&min_value, min_nominee) {
+        (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
+            *min_value = min_nominee.clone();
+        }
+        (Precision::Exact(val1), Precision::Inexact(val2))
+        | (Precision::Inexact(val1), Precision::Inexact(val2))
+        | (Precision::Inexact(val1), Precision::Exact(val2))
+            if val1 > val2 =>
+        {
+            *min_value = min_nominee.clone().to_inexact();
+        }
+        (Precision::Exact(_), Precision::Absent) => {
+            let exact_min = mem::take(min_value);
+            *min_value = exact_min.to_inexact();
+        }
+        (Precision::Absent, Precision::Exact(_)) => {
+            *min_value = min_nominee.clone().to_inexact();
+        }
+        (Precision::Absent, Precision::Inexact(_)) => {
+            *min_value = min_nominee.clone();
+        }
+        _ => {}
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to