robtandy opened a new pull request, #60:
URL: https://github.com/apache/datafusion-ray/pull/60
@andygrove, per our collaboration around this, here is the requested PR
main. Tagging @alamb here also for his additional insight and perspective
around query execution strategies, and to follow up from the presentation given
at the DataFusion Community Meeting.
## TL;DR
This PR contains a pull based stage execution model where stages are
connected using ArrowFlight. Its simpler and more performant than previous
iterations of the streaming rewrite. It works on TPCH SF100 and below. Above
has not been tested, though I think it should parallelize well at the expense
of execution overhead.
## Evolution of this work
This represents the third iteration of attempts to stream data between
stages. A brief accounting of those efforts might be useful to capture here:
1. Try to use the Ray Object Store to stream batches between stages.
This was a challenge for two reasons. The first was that under high
throughput of potentially small items, the object store added too much latency
to query processing. The second and reason this was abandoned is that
creating a shuffle writer exec plan node to jump from rust to python to
interact with the object store, and potentially itself, call rust, proved
difficult to manage and reason about.
2. The second attempt and one I have discussed on discord, was to adopt
ArrowFlight for streaming between stages and flip the execution of the stages
from pull to push. The thinking was to have each stage eagerly execute and
stream batches to an `Exchange` Actor which would hold a bunch of channels (num
stages x num partitions per stage), and allow subsequent stages to consume from
them.
The problems here were that the `Exchange Actor` was difficult to tune
and created an additional Arrow Flight hop. Another challenge was that
DataFusion is inherently a pull based architecture, and very easy to compose
and reason about. Flipping this was like swimming upstream and resulted in a
lot of complications which DataFusion already elegantly manages.
While it was interesting to consider push execution, and may inform
future work to consume from streams and materialize query results, ultimately,
it meant reimplementing a lot of things that DataFusion just makes easy.
3. The third attempt, and this iteration is purely pull based and uses
ArrowFlight to stream between stages. This turned out to produce the smallest
amount of code, and one that was easy to work with and debug. Its as if you
are executing Datafusion locally, but some of the execution nodes are connected
with ArrowFlight instead of channels.
There is more that can be improved performance and ergonomics wise, but this
is quite usable as it is, and will allow others to see and collaborate.
For examples, see the main readme, and `examples/tips.py` and `tpch/tpc.py`.
## Execution Strategy
DataFusion Ray will optimize a query plan and then break it into stages.
Those stages will be scheduled as Ray Actors and make their partition streams
available over arrow flight.
Connecting one stage to another means adding a `RayStageReaderExec` node
within the stage where a connection is required and it will go get the stream
using a `FlightClient`.
## Tunables:
* `---isolate` DataFusion Ray will attempt to host each Stage as its own
actor. This flag (in the examples and a parameter to the `RayContext`) will
tell DataFusion Ray to host _each partition of each stage_ as its own Actor.
This dramatically increases parallelism, but is a blunt instrument, and a more
fine tuned choice (like split a stage into x parts) would be more desirable,
and can be added in a future update.
* `--concurrency` will control the partitioning count for the all stages
and is planned using DataFusion before submitting control to Ray. This
interacts with `--isolate`
* `--batch-size` This controls the target (and also max) batch size
exchanged between stages. Currently 8192 works for all queries in TPCH SF100.
Going higher can produce Flight errors as we exceed the batch payload size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]