Yes, you could use Acero for this. However, I would hope that someday you could also use DuckDb and Datafusion to do the combining as well.
In my mind an "engine" is something that takes a plan (Substrait) and zero or more input streams (Arrow C stream interface[1]) and has one output stream (Arrow C stream interface). So, for example, if "combine them without considering the source they came from" means "interleave the batches into a common stream" then you could create a substrait plan with two named table relations (input0 and input1) and a set union all relation. Then you could execute your DuckDb plan and your Datafusion plan. You should now have two Arrow streams and a Substrait plan. You could pass those into Acero, DuckDb, or Datafusion to do the actual execution of the plan. Today I think only Acero has the ability to input C streams as named tables so I think you could only do it with Acero. But it should be a small lift for Datafusion or DuckDb to support. In pyarrow it would look something like... ``` datafusion_c_stream = ... # Execute query with datafusion and obtain a stream datafusion_reader = pyarrow.RecordBatchReader._import_from_c(datafusion_c_stream) duckdb_c_stream = ... # Execute query with duckdb and obtain a stream duckdb_reader = pyarrow.RecordBatchReader._import_from_c(duckdb_c_stream) plan = ... # load plan from file or build programmatically with something like ibis or PySubstrait def provide_table(names): if names[0] == "input0": return datafusion_reader else: return duckdb_reader pyarrow.substrait.run_query(plan, table_provider=provide_table) ``` I'm probably missing a few details and pyarrow doesn't actually let you return a RecordBatchReader from a table provider (it's doable in C++) but that's the rough idea. [1] https://arrow.apache.org/docs/format/CStreamInterface.html On Mon, Apr 10, 2023 at 4:36 PM Will Ayd <william....@icloud.com.invalid> wrote: > I am still wrapping my head around some of the technologies so excuse > any ignorance, but seeing as the OP mentioned the use case of /switching > /between execution engines is there not a gap if the concern is more > about /combining/ execution engines? AFAIU Substrait would allow me to > submit different queries to DuckDB and Datafusion - if I wanted to take > these results back and combine them without considering the source they > came from is Acero not the right tool for the job? > > On 3/14/23 11:50, Li Jin wrote: > > Late to the party. > > > > Thanks Weston for sharing the thoughts around Acero. We are actually a > > pretty heavy Acero user right now and are trying to take part in Acero > > maintenance and development. Internally we are using Acero for a time > > series streaming data processing system. > > > > I would +1 on many of Weston's directions here, in particular to make > Acero > > extensionable / customizable. IMO Acero might not be the fastest "Arrow > > SQL/TPC-H" engine, but the ability to customize it for ordered time > series > > is a huge/kill feature. > > > > In addition to what Weston has already said, my other two cents is that I > > think Acero would benefit from a separation from the Arrow core C++ > > library, similar to how Arrow Flight is. The main reason is that Arrow > core > > being such a widely used library, it benefits more from being stable and > > Acero being a relatively new and standalone component, benefits more from > > fast moving / quick experiment. My colleague and I are working on > > https://github.com/apache/arrow/issues/15280 to make this happen. > > > > > > > > > > > > On Fri, Mar 10, 2023 at 5:59 AM Andrew Lamb<al...@influxdata.com> > wrote: > > > >> I don't know much about the Acero user base, but gathering some > significant > >> term users (e.g. Ballista, Urban Logiq, GreptimeDB, InfluxDB IOx, etc) > has > >> been very helpful for DataFusion. Not only do such users bring some > amount > >> of maintenance capacity, but perhaps more relevantly to your discussion > >> they bring a focus to the project with their usecases. > >> > >> With so many possible tradeoffs (e.g. streaming vs larger batch > execution > >> as you mention above) having people to help focus the choice of project > I > >> think has served DataFusion well. > >> > >> If Acero has such users (or potential users) perhaps reaching out to > them / > >> soliciting their ideas of where they want to see the project go would > be a > >> valuable focusing exercise. > >> > >> Andrew > >> > >> On Thu, Mar 9, 2023 at 6:35 PM Aldrin<akmon...@ucsc.edu.invalid> > wrote: > >> > >>> Thanks for sharing! There are a variety of things that I didn't know > >> about > >>> (such as ExecBatchBuilder) and it's interesting to hear about the > >>> performance challenges. > >>> > >>> How much would future substrait work involve integration with Acero? > I'm > >>> curious how much more support of substrait is seen as valuable (should > be > >>> prioritized) or > >>> if additional support is going to be "as-needed". Note that I have a > >>> minimal understanding of how "large" substrait is and what proportion > of > >> it > >>> is already supported by > >>> Acero. > >>> > >>> Aldrin Montana > >>> Computer Science PhD Student > >>> UC Santa Cruz > >>> > >>> > >>> On Thu, Mar 9, 2023 at 12:33 PM Antoine Pitrou<anto...@python.org> > >> wrote: > >>>> Just a reminder for those following other implementations of Arrow, > >> that > >>>> Acero is the compute/execution engine subsystem baked into Arrow C++. > >>>> > >>>> Regards > >>>> > >>>> Antoine. > >>>> > >>>> > >>>> Le 09/03/2023 à 21:20, Weston Pace a écrit : > >>>>> We are getting closer to another release. I am thinking about what > >> to > >>>> work > >>>>> on in the next release. I think it is a good time to have a > >> discussion > >>>>> about Acero in general. This is possibly also of interest to those > >>>> working > >>>>> on pyarrow or r-arrow as these libraries rely on Acero for various > >>>>> functionality. Apache projects have no single owner and what follows > >>> is > >>>>> only my own personal opinion and plans. Still, I will apologize in > >>>> advance > >>>>> for any lingering hubris or outrageous declarations of fact :) > >>>>> > >>>>> First, some background. Since we started the project the landscape > >> has > >>>>> changed. Most importantly, there are now more arrow-native execution > >>>>> engines. For example, datafusion, duckdb, velox, and I'm sure there > >>> are > >>>>> probably more. Substrait has also been created, allowing users to > >>>>> hopefully switch between different execution engines as different > >> needs > >>>>> arise. Some significant contributors to Acero have taken a break or > >>>> moved > >>>>> onto other projects and new contributors have arrived with new > >>> interests > >>>>> and goals (For example, an asof join node and more focus on ordered / > >>>>> streaming execution). > >>>>> > >>>>> I do not personally have the resources for bringing Acero's > >> performance > >>>> to > >>>>> match that of some of the other execution engines. I'm also not > >> aware > >>> of > >>>>> any significant contributors attempting to do so. I also think that > >>>> having > >>>>> yet another engine racing to the top of the TPC-H benchmarks is not > >> the > >>>>> best thing we can be doing for our users. To be clear, our > >> performance > >>>> is > >>>>> not "bad" but it is not "state of the art". > >>>>> > >>>>> ## Some significant performance challenges for Acero: > >>>>> > >>>>> 1. Ideally an execution engine that wants to win TPC-H should > >> operate > >>>> on > >>>>> L2 sized batches. To risk stating the obvious: that is not very > >> large. > >>>>> Typically less than 100k rows. At that size of operation the > >>> philosophy > >>>> of > >>>>> "we are only doing this per-batch so we don't have to be worried > >> about > >>>>> performance" falls apart. Significant pieces of Acero are not built > >> to > >>>>> operate effectively at this small of a batch size. This is probably > >>> most > >>>>> evident in our expression evaluation and in queries that have complex > >>>>> expressions invoking many functions. > >>>>> > >>>>> 2. Our expression evaluation is missing a fair number of > >>> optimizations. > >>>>> The ability to use temporary vectors instead of allocating new > >> vectors > >>>>> between function calls. Usage of selection vectors to avoid > >>>> materializing > >>>>> filter results. General avoidance of allocation and preference for > >>>> thread > >>>>> local data. > >>>>> > >>>>> 3. Writing a library of compute functions that is compact, able to > >>> run > >>>> in > >>>>> any architecture, and able to take full advantage of the underlying > >>>>> hardware is an extremely difficult challenge and there are likely > >>> things > >>>>> that could be improved in our kernel functions. > >>>>> > >>>>> 4. Acero does no query optimization. Hopefully Substrait > >> optimizers > >>>> will > >>>>> emerge to fill this gap. In the meantime, this remains a significant > >>> gap > >>>>> when comparing Acero to most other execution engines. > >>>>> > >>>>> I am not (personally) planning on addressing any of the above issues > >>> (no > >>>>> time and little interest). Furthermore, other execution engines > >> either > >>>>> already handle these things or they are investing significant funds > >> to > >>>> make > >>>>> sure they can. In fact, I would be in favor of explicitly abandoning > >>> the > >>>>> morsel-batch model and focusing on larger batch sizes in the spirit > >> of > >>>>> simplicity. > >>>>> > >>>>> This does not mean that I want to abandon Acero. Acero is valuable > >>> for a > >>>>> number of users who don't need that last 20% of performance and would > >>>>> rather not introduce a new library. Acero has been a valuable > >> building > >>>>> block for those that are exploring unique execution models or whose > >>>>> workloads don't cleanly fit into an SQL query. Acero has been used > >>>>> effectively for academic research. Acero has been valuable for me > >>>>> personally as a sort of "reference implementation" for a Substrait > >>>> consumer > >>>>> as well as being a reference engine for connectivity and > >>> decentralization > >>>>> in general. > >>>>> > >>>>> ## My roadmap > >>>>> > >>>>> Over the next year I plan on transitioning more time into Substrait > >>> work. > >>>>> But this is less because I am abandoning Acero and more because I > >> would > >>>>> like to start wrapping Acero up. In my mind, Acero as an "extensible > >>>>> streaming execution engine" is very nearly "complete" (as much as > >>>> anything > >>>>> is ever complete). > >>>>> > >>>>> 1. One significant remaining challenge is getting some better tools > >> in > >>>>> place for reducing runtime memory usage. This mostly equates to > >> being > >>>>> smarter about scanning (in particular how we scan large row groups) > >> and > >>>>> adding support for spilling to pipeline breakers (there is a > >> promising > >>> PR > >>>>> for this that I have not yet been able to get around to). I would > >> like > >>>> to > >>>>> find time to address these things over the next year. > >>>>> > >>>>> 2. I would like Acero to be better documented and more extensible. > >> It > >>>>> should be relatively simple (and hopefully as foolproof as possible) > >>> for > >>>>> users to create their own extension nodes. Perhaps we could even > >>> support > >>>>> python extension nodes. There has been some promising work around > >>>>> Substrait extension nodes which I think could be generalized to allow > >>>>> extension node developers to use Substrait without having to create > >>>> .proto > >>>>> files. > >>>>> > >>>>> 3. Finally, pyarrow is a toolbox. I would like to see some of the > >>>> internal > >>>>> compute utilities exposed as their own tools (and pyarrow bindings > >>>> added). > >>>>> Significantly (though I don't think I'll get to all of these): > >>>>> > >>>>> * The ExecBatchBuilder is a useful accumulation tool. It could be > >>>> used, > >>>>> for example, to load datasets into pandas that are almost as big as > >> RAM > >>>>> (today you typically need at least 2x memory to convert to pandas). > >>>>> * The GroupingSegmenter could be used to support workflows like > >>> "Group > >>>> by > >>>>> X and then give me a pandas dataframe for each group". > >>>>> * Whatever utilities we develop for spilling could be useful as > >>>> temporary > >>>>> caches. > >>>>> * There is an entire row based encoding and hash table built in > >> there > >>>>> somewhere. > >>>>> > >>>>> There are also a few things that I would love to see added but I > >> don't > >>>>> expect to be able to get to it myself anytime soon. If anyone is > >>>>> interested feel free to reach out and I'd be happy to brainstorm > >>>>> implementation. Off the top of my head: > >>>>> > >>>>> * Support for window functions (for those of you that are not SQL > >>>> insiders > >>>>> this means "functions that rely on row order", like cumulative sum, > >>> rank, > >>>>> or lag) > >>>>> * We have most of the basic building blocks and so a relatively > >>> naive > >>>>> implementation shouldn't be a huge stretch. > >>>>> * Support for a streaming merge join (e.g. if the keys are ordered > >> on > >>>> both > >>>>> inputs you don't have to accumulate all of one input) > >>>>> > >>>>> I welcome any input and acknowledge that any or all of this could > >>> change > >>>>> completely in the next 3 months. > >>>>>