This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9eddf473fa fix: CteWorkTable: properly apply TableProvider::scan 
projection argument (#18993)
9eddf473fa is described below

commit 9eddf473fad933e8d8b58fed63b1f01bfc951e06
Author: Thomas Tanon <[email protected]>
AuthorDate: Mon Dec 29 04:27:02 2025 +0100

    fix: CteWorkTable: properly apply TableProvider::scan projection argument 
(#18993)
    
    It was previously ignored
    
    ## Which issue does this PR close?
    
    - Closes #18992.
    
    ## Rationale for this change
    
    All `TableProvider` implementations must support the `projection`
    argument of the `scan` method. This was not the case in `CteWorkTable`.
    
    ## What changes are included in this PR?
    
    Minimal implementation of the projection support. The projection applied
    before the plan node return results. It might be nice to push it further
    inside of the recursion implementation to reduce memory consumption but
    I preferred to keep the fix minimal.
    
    ## Are these changes tested?
    
    I have not figured out yet a nice SQL query to trigger an error without
    this change. Some existing queries in `cte.slt` have set projection
    (i.e. not `None`) so the code is very likely working. There is also a
    test on the projection itself in `WorkTableExec`
---
 datafusion/catalog/src/cte_worktable.rs    | 38 ++++++++-----
 datafusion/physical-plan/src/work_table.rs | 91 +++++++++++++++++++++++++++---
 2 files changed, 106 insertions(+), 23 deletions(-)

diff --git a/datafusion/catalog/src/cte_worktable.rs 
b/datafusion/catalog/src/cte_worktable.rs
index d6b2a45311..9565dcc601 100644
--- a/datafusion/catalog/src/cte_worktable.rs
+++ b/datafusion/catalog/src/cte_worktable.rs
@@ -17,20 +17,18 @@
 
 //! CteWorkTable implementation used for recursive queries
 
+use std::any::Any;
+use std::borrow::Cow;
 use std::sync::Arc;
-use std::{any::Any, borrow::Cow};
 
-use crate::Session;
 use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
-use datafusion_physical_plan::work_table::WorkTableExec;
-
-use datafusion_physical_plan::ExecutionPlan;
-
 use datafusion_common::error::Result;
 use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, 
TableType};
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_physical_plan::work_table::WorkTableExec;
 
-use crate::TableProvider;
+use crate::{ScanArgs, ScanResult, Session, TableProvider};
 
 /// The temporary working table where the previous iteration of a recursive 
query is stored
 /// Naming is based on PostgreSQL's implementation.
@@ -85,16 +83,28 @@ impl TableProvider for CteWorkTable {
 
     async fn scan(
         &self,
-        _state: &dyn Session,
-        _projection: Option<&Vec<usize>>,
-        _filters: &[Expr],
-        _limit: Option<usize>,
+        state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        filters: &[Expr],
+        limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        // TODO: pushdown filters and limits
-        Ok(Arc::new(WorkTableExec::new(
+        let options = ScanArgs::default()
+            .with_projection(projection.map(|p| p.as_slice()))
+            .with_filters(Some(filters))
+            .with_limit(limit);
+        Ok(self.scan_with_args(state, options).await?.into_inner())
+    }
+
+    async fn scan_with_args<'a>(
+        &self,
+        _state: &dyn Session,
+        args: ScanArgs<'a>,
+    ) -> Result<ScanResult> {
+        Ok(ScanResult::new(Arc::new(WorkTableExec::new(
             self.name.clone(),
             Arc::clone(&self.table_schema),
-        )))
+            args.projection().map(|p| p.to_vec()),
+        )?)))
     }
 
     fn supports_filters_pushdown(
diff --git a/datafusion/physical-plan/src/work_table.rs 
b/datafusion/physical-plan/src/work_table.rs
index 87ca050e20..f1b9e3e88d 100644
--- a/datafusion/physical-plan/src/work_table.rs
+++ b/datafusion/physical-plan/src/work_table.rs
@@ -102,6 +102,8 @@ pub struct WorkTableExec {
     name: String,
     /// The schema of the stream
     schema: SchemaRef,
+    /// Projection to apply to build the output stream from the recursion state
+    projection: Option<Vec<usize>>,
     /// The work table
     work_table: Arc<WorkTable>,
     /// Execution metrics
@@ -112,15 +114,23 @@ pub struct WorkTableExec {
 
 impl WorkTableExec {
     /// Create a new execution plan for a worktable exec.
-    pub fn new(name: String, schema: SchemaRef) -> Self {
+    pub fn new(
+        name: String,
+        mut schema: SchemaRef,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Self> {
+        if let Some(projection) = &projection {
+            schema = Arc::new(schema.project(projection)?);
+        }
         let cache = Self::compute_properties(Arc::clone(&schema));
-        Self {
+        Ok(Self {
             name: name.clone(),
             schema,
-            metrics: ExecutionPlanMetricsSet::new(),
+            projection,
             work_table: Arc::new(WorkTable::new(name)),
+            metrics: ExecutionPlanMetricsSet::new(),
             cache,
-        }
+        })
     }
 
     /// Ref to name
@@ -198,11 +208,22 @@ impl ExecutionPlan for WorkTableExec {
             0,
             "WorkTableExec got an invalid partition {partition} (expected 0)"
         );
-        let batch = self.work_table.take()?;
+        let ReservedBatches {
+            mut batches,
+            reservation,
+        } = self.work_table.take()?;
+        if let Some(projection) = &self.projection {
+            // We apply the projection
+            // TODO: it would be better to apply it as soon as possible and 
not only here
+            // TODO: an aggressive projection makes the memory reservation 
smaller, even if we do not edit it
+            batches = batches
+                .into_iter()
+                .map(|b| b.project(projection))
+                .collect::<Result<Vec<_>, _>>()?;
+        }
 
-        let stream =
-            MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), 
None)?
-                .with_reservation(batch.reservation);
+        let stream = MemoryStream::try_new(batches, Arc::clone(&self.schema), 
None)?
+            .with_reservation(reservation);
         Ok(Box::pin(cooperative(stream)))
     }
 
@@ -239,6 +260,7 @@ impl ExecutionPlan for WorkTableExec {
         Some(Arc::new(Self {
             name: self.name.clone(),
             schema: Arc::clone(&self.schema),
+            projection: self.projection.clone(),
             metrics: ExecutionPlanMetricsSet::new(),
             work_table,
             cache: self.cache.clone(),
@@ -249,8 +271,10 @@ impl ExecutionPlan for WorkTableExec {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use arrow::array::{ArrayRef, Int32Array};
+    use arrow::array::{ArrayRef, Int16Array, Int32Array, Int64Array};
+    use arrow_schema::{DataType, Field, Schema};
     use datafusion_execution::memory_pool::{MemoryConsumer, 
UnboundedMemoryPool};
+    use futures::StreamExt;
 
     #[test]
     fn test_work_table() {
@@ -283,4 +307,53 @@ mod tests {
         drop(memory_stream);
         assert_eq!(pool.reserved(), 0);
     }
+
+    #[tokio::test]
+    async fn test_work_table_exec() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int16, false),
+        ]));
+        let work_table_exec =
+            WorkTableExec::new("wt".into(), Arc::clone(&schema), Some(vec![2, 
1]))
+                .unwrap();
+
+        // We inject the work table
+        let work_table = Arc::new(WorkTable::new("wt".into()));
+        let work_table_exec = work_table_exec
+            .with_new_state(Arc::clone(&work_table) as _)
+            .unwrap();
+
+        // We update the work table
+        let pool = Arc::new(UnboundedMemoryPool::default()) as _;
+        let reservation = 
MemoryConsumer::new("test_work_table").register(&pool);
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
+                Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
+                Arc::new(Int16Array::from(vec![1, 2, 3, 4, 5])),
+            ],
+        )
+        .unwrap();
+        work_table.update(ReservedBatches::new(vec![batch], reservation));
+
+        // We get back the batch from the work table
+        let returned_batch = work_table_exec
+            .execute(0, Arc::new(TaskContext::default()))
+            .unwrap()
+            .next()
+            .await
+            .unwrap()
+            .unwrap();
+        assert_eq!(
+            returned_batch,
+            RecordBatch::try_from_iter(vec![
+                ("c", Arc::new(Int16Array::from(vec![1, 2, 3, 4, 5])) as _),
+                ("b", Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as _),
+            ])
+            .unwrap()
+        );
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to