isidentical commented on issue #462: URL: https://github.com/apache/arrow-datafusion/issues/462#issuecomment-1264744158
@Dandandan I've been experimenting with whether support for recursive CTEs could be done without too much overhead (like holding onto all the results from all the iterations until the recursion is over) and without using any real 'tables' (like the "working table" concept in postgres, a table where the previous iteration's results are materialized). It seems like there might be a solution, but before going in too much deeper and spending a lot of time on the details I wanted to see if it makes sense in general (or whether there are any obvious obstacles that I might have missed) and whether I should submit it as a PR to Datafusion. ### Implementation (very wordy) [The branch](https://github.com/isidentical/arrow-datafusion/pull/4) (the source code also has a decent-ish amount of documentation left-over from my experiments, in case anyone is interested). ```sql WITH RECURSIVE nodes(node_1) AS ( SELECT 1 as node_1 UNION SELECT node_1+1 as node_1 FROM nodes WHERE node_1 < 100 ) SELECT node_1 FROM nodes; ``` I've tried to localize the number of changes needed to compile the recursive queries in the SQL planner, and it seems like we could get away with no changes outside of the `CTE` compilation part (and a new logical op). The main workflow which we use when compiling a query like the one above is the following: - We split the CTE into two terms (the part before the `UNION` is called a **static term** and the one after is called a **recursive term**). - The **static term** is compiled to a logical plan as is. - For the **recursive term**, compiling it without any special operations is impossible since it includes a back-reference to the table it is originating from (`FROM nodes`). For getting around this: - We create a new 'named relation' (a relation object, like `EmptyRelation`, that has a handle and a schema). The schema comes from the **static term**'s plan. - The 'named relation' is registered as a CTE with the name `nodes`. So when the SQL planner is resolving that relation, it will take the 'named relation' plan we created and use it. - We compile the **recursive term** and de-register the CTE (since the real `nodes`, is actually a different plan). - We combine both the **static term** and the **recursive term** in a new logical operation called `RecursiveQuery` (also with the handle name that we have used to create the named relation) and register it as a CTE. Here is the plan for the query above: ``` Projection: #nodes.node_1 RecursiveQuery: name=nodes, is_distinct=false <-------- Projection: Int64(1) AS node_1, alias=nodes EmptyRelation Projection: #nodes.node_1 + Int64(1) AS node_1, alias=nodes Filter: #nodes.node_1 < Int64(100) NamedRelation: nodes <-------- ``` In terms of the physical plan, for each `RecursiveQuery` we create a `RecursiveQueryExec` (very similar in terms of fields). And for each `NamedRelation`, we create a `ContinuationStream` (more details below). Once these are translated, the final representation looks something like this: ``` ProjectionExec: expr=[node_1@0 as node_1] RecursiveQueryExec: name=nodes, is_distinct=false CoalescePartitionsExec ProjectionExec: expr=[1 as node_1] RepartitionExec: partitioning=RoundRobinBatch(16) EmptyExec: produce_one_row=true CoalescePartitionsExec ProjectionExec: expr=[node_1@0 + 1 as node_1] CoalesceBatchesExec: target_batch_size=4096 FilterExec: node_1@0 < 100 RepartitionExec: partitioning=RoundRobinBatch(16) ContinuanceExec: name=nodes ```  The namings on the sketch might be a bit cryptic, so here is a brief explanation: - T'0 corresponds to the **static term**, where it will be executed without any access to the previous data. - For each batch T'0 produces, we'll directly mirror it to the parent operation (e.g. a filter, like F), so it will not suffer from buffering. - For the batches we are mirroring, we are also going to save a copy of them to an in-memory store (L). - Once T'0 ends, L will decide on whether to have another iteration based on the rows it collected (the previous term must produce at least 1 new [sometimes unique depending on the case] item). If it does, then it will setup an MPSC channel and start streaming the buffered batches. Once that is done it will execute the **recursive term** as T'1. - T'1 will repeat these steps, and if it can successfully produce new rows, then a new stream T'2 will be spawned (and all the buffered data from T'1 will be transferred to it). - And once T'2 fails to produce any more rows (e.g. a `WHERE n <= 100` clause with all `n`s are `>100`), it will finish operating. The parent operation receives its input no matter what happens internally (in terms of buffering). The buffering currently is only for the previous iteration's records, but we might also want to keep a hashset of the all the rows we have seen once we add support for `UNION ALL` syntax with recursive (it is pretty useful and prevents most of the infinite recursin cases). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org