isidentical commented on issue #462:
URL: 
https://github.com/apache/arrow-datafusion/issues/462#issuecomment-1264744158

   @Dandandan I've been experimenting with whether support for recursive CTEs 
could be done without too much overhead (like holding onto all the results from 
all the iterations until the recursion is over) and without using any real 
'tables' (like the "working table" concept in postgres, a table where the 
previous iteration's results are materialized). It seems like there might be a 
solution, but before going in too much deeper and spending a lot of time on the 
details I wanted to see if it makes sense in general (or whether there are any 
obvious obstacles that I might have missed) and whether I should submit it as a 
PR to Datafusion.
   
   ### Implementation (very wordy)
   
   [The branch](https://github.com/isidentical/arrow-datafusion/pull/4) (the 
source code also has a decent-ish amount of documentation left-over from my 
experiments, in case anyone is interested).
   
   ```sql
   WITH RECURSIVE nodes(node_1) AS (
       SELECT 1 as node_1
     UNION
       SELECT node_1+1 as node_1 FROM nodes WHERE node_1 < 100
   )
   SELECT node_1 FROM nodes;
   ```
   
   I've tried to localize the number of changes needed to compile the recursive 
queries in the SQL planner, and it seems like we could get away with no changes 
outside of the `CTE` compilation part (and a new logical op). The main workflow 
which we use when compiling a query like the one above is the following:
   
   - We split the CTE into two terms (the part before the `UNION` is called a 
**static term** and the one after is called a **recursive term**).
   - The **static term** is compiled to a logical plan as is.
   - For the **recursive term**, compiling it without any special operations is 
impossible since it includes a back-reference to the table it is originating 
from (`FROM nodes`). For getting around this:
     - We create a new 'named relation' (a relation object, like 
`EmptyRelation`, that has a handle and a schema). The schema comes from the 
**static term**'s plan.
     - The 'named relation' is registered as a CTE with the name `nodes`. So 
when the SQL planner is resolving that relation, it will take the 'named 
relation' plan we created and use it.
     - We compile the **recursive term** and de-register the CTE (since the 
real `nodes`, is actually a different plan).
   - We combine both the **static term** and the **recursive term** in a new 
logical operation called `RecursiveQuery` (also with the handle name that we 
have used to create the named relation) and register it as a CTE.
   
   Here is the plan for the query above:
   ```
   Projection: #nodes.node_1                                       
     RecursiveQuery: name=nodes, is_distinct=false                              
<--------
       Projection: Int64(1) AS node_1, alias=nodes                 
         EmptyRelation                                             
       Projection: #nodes.node_1 + Int64(1) AS node_1, alias=nodes 
         Filter: #nodes.node_1 < Int64(100)                        
           NamedRelation: nodes                                                 
<--------
   ```
   
   In terms of the physical plan, for each `RecursiveQuery` we create a 
`RecursiveQueryExec` (very similar in terms of fields). And for each 
`NamedRelation`, we create a `ContinuationStream` (more details below). Once 
these are translated, the final representation looks something like this:
   
   ```
   ProjectionExec: expr=[node_1@0 as node_1]                       
     RecursiveQueryExec: name=nodes, is_distinct=false                         
       CoalescePartitionsExec                                      
         ProjectionExec: expr=[1 as node_1]                        
           RepartitionExec: partitioning=RoundRobinBatch(16)       
             EmptyExec: produce_one_row=true                       
       CoalescePartitionsExec                                      
         ProjectionExec: expr=[node_1@0 + 1 as node_1]             
           CoalesceBatchesExec: target_batch_size=4096             
             FilterExec: node_1@0 < 100                            
               RepartitionExec: partitioning=RoundRobinBatch(16)   
                 ContinuanceExec: name=nodes  
   ```
   
   
![image](https://user-images.githubusercontent.com/47358913/193478214-0eeaa480-fc01-4eb5-84ac-2a01f6f3d26b.png)
   
   The namings on the sketch might be a bit cryptic, so here is a brief 
explanation:
   - T'0 corresponds to the **static term**, where it will be executed without 
any access to the previous data.
   - For each batch T'0 produces, we'll directly mirror it to the parent 
operation (e.g. a filter, like F), so it will not suffer from buffering.
   - For the batches we are mirroring, we are also going to save a copy of them 
to an in-memory store (L).
   - Once T'0 ends, L will decide on whether to have another iteration based on 
the rows it collected (the previous term must produce at least 1 new [sometimes 
unique depending on the case] item). If it does, then it will setup an MPSC 
channel and start streaming the buffered batches. Once that is done it will 
execute the **recursive term** as T'1.
   - T'1 will repeat these steps, and if it can successfully produce new rows, 
then a new stream T'2 will be spawned (and all the buffered data from T'1 will 
be transferred to it).
   - And once T'2 fails to produce any more rows (e.g. a `WHERE n <= 100` 
clause with all `n`s are `>100`), it will finish operating.
   
   The parent operation receives its input no matter what happens internally 
(in terms of buffering). The buffering currently is only for the previous 
iteration's records, but we might also want to keep a hashset of the all the 
rows we have seen once we add support for `UNION ALL` syntax with recursive (it 
is pretty useful and prevents most of the infinite recursin cases).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to