askalt opened a new issue, #19351:
URL: https://github.com/apache/datafusion/issues/19351

   ### Is your feature request related to a problem or challenge?
   
   This issue is a reincarnation of 
https://github.com/apache/datafusion/issues/14342.
   
   We are working on a DBMS that uses DataFusion as the SQL query engine. In 
general, there is a challenge: physical plan building bottlenecks. The impact 
can be reduced to some degree, but as the number of optimizations increases, 
query latency becomes uncontrollable.
   
   It would be beneficial to reuse a single plan across many executions (like a 
prepared statement). The problem is not so much ideological as technical: the 
current implementation stores execution-stage state in the plan itself and 
generally does not assume that `execute(...)` can be called more than once or 
concurrently for the same `Arc<dyn ExecutionPlan>`.
   
   ```rust
       fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
           ...
       }
   
       fn metrics(&self) -> Option<MetricsSet> {
           ...
       }
   
       fn with_new_state(
           &self,
           _state: Arc<dyn Any + Send + Sync>,
       ) -> Option<Arc<dyn ExecutionPlan>> {
           ...
       }
   ```  
   
   We implemented a reusable plans feature (based on release 42) in our 
[fork](https://github.com/tarantool/datafusion/tree/release-42.0.0). As it 
showed good results, we are currently working on porting it to the upstream. 
   
   
   ### Describe the solution you'd like
   
   Consider the following trait, which could be used to represent a plan state:
   
   ```rust
   /// Generic plan state.
   pub trait PlanState: std::fmt::Debug + std::any::Any + Send + Sync {
       /// Represent this state as any.
       fn as_any(&self) -> &dyn std::any::Any;
       
       /// Return plan metrics if some. 
       fn metrics(&self) -> Option<crate::metrics::ExecutionPlanMetricsSet>;
   }
   ```
   
   Suggest to store plan state in the existing `TaskContext` struct:
   
   ```rust
   /// A [`TaskContext`] contains the state required during a single query's
   /// execution. Please see the documentation on [`SessionContext`] for more
   /// information.
   ```
   
   `TaskContext` could store a mapping from the execution plan to its state. As 
`TaskContext` is only used per single query execution, so it can use a plan 
raw-pointer address:
   
   ```rust
   struct TaskContext {
     ...  
     plan_state: std::sync::Mutex<HashMap<usize, Arc<dyn PlanState>>>,
   }
   
   fn plan_addr(plan: &dyn Any) -> usize {
       plan as *const _ as *const () as usize
   }
   
   impl TaskContext {
       /// Get state of the passed plan or register a new one.
       pub fn get_or_register_plan_state<F>(
           &self,
           plan: &dyn Any,
           f: F,
       ) -> Arc<dyn PlanState>
       where
           F: FnOnce() -> Arc<dyn PlanState>,
       {
           let addr = plan_addr(plan);
           let mut plan_state = self.plan_state.lock().unwrap();
           if let Some(state) = plan_state.get(&addr) {
               Arc::clone(state)
           } else {
               let state = f();
               plan_state.insert(addr, Arc::clone(&state));
               state
           }
       }
   }
   ```
   
   Note: if the plan is deallocated then its state is either stored in the 
streams or is not needed anymore.
   
   Then, when a particular plan want to acquire its state, it calls 
`get_or_register_plan_state`, an example for `CrossJoinExec`:
   
   ```rust
       fn execute(
           &self,
           partition: usize,
           context: Arc<TaskContext>,
       ) -> Result<SendableRecordBatchStream> {
           let state = context
               .get_or_register_plan_state(self, || 
Arc::new(CrossJoinExecState::new()));
           //  Work with state: register metrics, spawn workers, etc.
           let left_fut = state
               .as_any()
               .downcast_ref::<CrossJoinExecState>()
               .unwrap()
               .left_fut
               .once(|| {
                   load_left_input(
                       Arc::clone(&self.left),
                       Arc::clone(&context),
                       join_metrics.clone(),
                       reservation,
                   )
               });
            ... 
   }
   ```
   
   Note: this way a state is shared across partitions. 
   
   Note: in this case we should change public API as methods mentioned in the 
previous section (`reset_state`, `metrics`, `with_new_state`) should be removed 
from the `ExecutionPlan` trait itself. 
   
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to