Yes, you could use Acero for this.  However, I would hope that someday you
could also use DuckDb and Datafusion to do the combining as well.

In my mind an "engine" is something that takes a plan (Substrait) and zero
or more input streams (Arrow C stream interface[1]) and has one output
stream (Arrow C stream interface).

So, for example, if "combine them without considering the source they came
from" means "interleave the batches into a common stream" then you could
create a substrait plan with two named table relations (input0 and input1)
and a set union all relation.  Then you could execute your DuckDb plan and
your Datafusion plan.  You should now have two Arrow streams and a
Substrait plan.  You could pass those into Acero, DuckDb, or Datafusion to
do the actual execution of the plan.

Today I think only Acero has the ability to input C streams as named tables
so I think you could only do it with Acero.  But it should be a small lift
for Datafusion or DuckDb to support.  In pyarrow it would look something
like...

```
datafusion_c_stream = ... # Execute query with datafusion and obtain a
stream
datafusion_reader =
pyarrow.RecordBatchReader._import_from_c(datafusion_c_stream)
duckdb_c_stream = ... # Execute query with duckdb and obtain a stream
duckdb_reader = pyarrow.RecordBatchReader._import_from_c(duckdb_c_stream)
plan = ... # load plan from file or build programmatically with something
like ibis or PySubstrait

def provide_table(names):
  if names[0] == "input0":
    return datafusion_reader
  else:
    return duckdb_reader

pyarrow.substrait.run_query(plan, table_provider=provide_table)
```

I'm probably missing a few details and pyarrow doesn't actually let you
return a RecordBatchReader from a table provider (it's doable in C++) but
that's the rough idea.

[1] https://arrow.apache.org/docs/format/CStreamInterface.html

On Mon, Apr 10, 2023 at 4:36 PM Will Ayd <william....@icloud.com.invalid>
wrote:

> I am still wrapping my head around some of the technologies so excuse
> any ignorance, but seeing as the OP mentioned the use case of /switching
> /between execution engines is there not a gap if the concern is more
> about /combining/ execution engines? AFAIU Substrait would allow me to
> submit different queries to DuckDB and Datafusion - if I wanted to take
> these results back and combine them without considering the source they
> came from is Acero not the right tool for the job?
>
> On 3/14/23 11:50, Li Jin wrote:
> > Late to the party.
> >
> > Thanks Weston for sharing the thoughts around Acero. We are actually a
> > pretty heavy Acero user right now and are trying to take part in Acero
> > maintenance and development. Internally we are using Acero for a time
> > series streaming data processing system.
> >
> > I would +1 on many of Weston's directions here, in particular to make
> Acero
> > extensionable / customizable. IMO Acero might not be the fastest "Arrow
> > SQL/TPC-H" engine, but the ability to customize it for ordered time
> series
> > is a huge/kill feature.
> >
> > In addition to what Weston has already said, my other two cents is that I
> > think Acero would benefit from a separation from the Arrow core C++
> > library, similar to how Arrow Flight is. The main reason is that Arrow
> core
> > being such a widely used library, it benefits more from being stable and
> > Acero being a relatively new and standalone component, benefits more from
> > fast moving / quick experiment. My colleague and I are working on
> > https://github.com/apache/arrow/issues/15280  to make this happen.
> >
> >
> >
> >
> >
> > On Fri, Mar 10, 2023 at 5:59 AM Andrew Lamb<al...@influxdata.com>
> wrote:
> >
> >> I don't know much about the Acero user base, but gathering some
> significant
> >> term users (e.g. Ballista, Urban Logiq, GreptimeDB, InfluxDB IOx, etc)
> has
> >> been very helpful for DataFusion. Not only do such users bring some
> amount
> >> of maintenance capacity, but perhaps more relevantly to your discussion
> >> they bring a focus to the project with their usecases.
> >>
> >> With so many possible tradeoffs (e.g. streaming vs larger batch
> execution
> >> as you mention above) having people to help focus the choice of project
> I
> >> think has served DataFusion well.
> >>
> >> If Acero has such users (or potential users) perhaps reaching out to
> them /
> >> soliciting their ideas of where they want to see the project go would
> be a
> >> valuable focusing exercise.
> >>
> >> Andrew
> >>
> >> On Thu, Mar 9, 2023 at 6:35 PM Aldrin<akmon...@ucsc.edu.invalid>
> wrote:
> >>
> >>> Thanks for sharing! There are a variety of things that I didn't know
> >> about
> >>> (such as ExecBatchBuilder) and it's interesting to hear about the
> >>> performance challenges.
> >>>
> >>> How much would future substrait work involve integration with Acero?
> I'm
> >>> curious how much more support of substrait is seen as valuable (should
> be
> >>> prioritized) or
> >>> if additional support is going to be "as-needed". Note that I have a
> >>> minimal understanding of how "large" substrait is and what proportion
> of
> >> it
> >>> is already supported by
> >>> Acero.
> >>>
> >>> Aldrin Montana
> >>> Computer Science PhD Student
> >>> UC Santa Cruz
> >>>
> >>>
> >>> On Thu, Mar 9, 2023 at 12:33 PM Antoine Pitrou<anto...@python.org>
> >> wrote:
> >>>> Just a reminder for those following other implementations of Arrow,
> >> that
> >>>> Acero is the compute/execution engine subsystem baked into Arrow C++.
> >>>>
> >>>> Regards
> >>>>
> >>>> Antoine.
> >>>>
> >>>>
> >>>> Le 09/03/2023 à 21:20, Weston Pace a écrit :
> >>>>> We are getting closer to another release.  I am thinking about what
> >> to
> >>>> work
> >>>>> on in the next release.  I think it is a good time to have a
> >> discussion
> >>>>> about Acero in general.  This is possibly also of interest to those
> >>>> working
> >>>>> on pyarrow or r-arrow as these libraries rely on Acero for various
> >>>>> functionality.  Apache projects have no single owner and what follows
> >>> is
> >>>>> only my own personal opinion and plans.  Still, I will apologize in
> >>>> advance
> >>>>> for any lingering hubris or outrageous declarations of fact :)
> >>>>>
> >>>>> First, some background.  Since we started the project the landscape
> >> has
> >>>>> changed.  Most importantly, there are now more arrow-native execution
> >>>>> engines.  For example, datafusion, duckdb, velox, and I'm sure there
> >>> are
> >>>>> probably more.  Substrait has also been created, allowing users to
> >>>>> hopefully switch between different execution engines as different
> >> needs
> >>>>> arise.  Some significant contributors to Acero have taken a break or
> >>>> moved
> >>>>> onto other projects and new contributors have arrived with new
> >>> interests
> >>>>> and goals (For example, an asof join node and more focus on ordered /
> >>>>> streaming execution).
> >>>>>
> >>>>> I do not personally have the resources for bringing Acero's
> >> performance
> >>>> to
> >>>>> match that of some of the other execution engines.  I'm also not
> >> aware
> >>> of
> >>>>> any significant contributors attempting to do so.  I also think that
> >>>> having
> >>>>> yet another engine racing to the top of the TPC-H benchmarks is not
> >> the
> >>>>> best thing we can be doing for our users.  To be clear, our
> >> performance
> >>>> is
> >>>>> not "bad" but it is not "state of the art".
> >>>>>
> >>>>> ## Some significant performance challenges for Acero:
> >>>>>
> >>>>>    1. Ideally an execution engine that wants to win TPC-H should
> >> operate
> >>>> on
> >>>>> L2 sized batches.  To risk stating the obvious: that is not very
> >> large.
> >>>>> Typically less than 100k rows.  At that size of operation the
> >>> philosophy
> >>>> of
> >>>>> "we are only doing this per-batch so we don't have to be worried
> >> about
> >>>>> performance" falls apart.  Significant pieces of Acero are not built
> >> to
> >>>>> operate effectively at this small of a batch size.  This is probably
> >>> most
> >>>>> evident in our expression evaluation and in queries that have complex
> >>>>> expressions invoking many functions.
> >>>>>
> >>>>>    2. Our expression evaluation is missing a fair number of
> >>> optimizations.
> >>>>> The ability to use temporary vectors instead of allocating new
> >> vectors
> >>>>> between function calls.  Usage of selection vectors to avoid
> >>>> materializing
> >>>>> filter results.  General avoidance of allocation and preference for
> >>>> thread
> >>>>> local data.
> >>>>>
> >>>>>    3. Writing a library of compute functions that is compact, able to
> >>> run
> >>>> in
> >>>>> any architecture, and able to take full advantage of the underlying
> >>>>> hardware is an extremely difficult challenge and there are likely
> >>> things
> >>>>> that could be improved in our kernel functions.
> >>>>>
> >>>>>    4. Acero does no query optimization.  Hopefully Substrait
> >> optimizers
> >>>> will
> >>>>> emerge to fill this gap.  In the meantime, this remains a significant
> >>> gap
> >>>>> when comparing Acero to most other execution engines.
> >>>>>
> >>>>> I am not (personally) planning on addressing any of the above issues
> >>> (no
> >>>>> time and little interest).  Furthermore, other execution engines
> >> either
> >>>>> already handle these things or they are investing significant funds
> >> to
> >>>> make
> >>>>> sure they can.  In fact, I would be in favor of explicitly abandoning
> >>> the
> >>>>> morsel-batch model and focusing on larger batch sizes in the spirit
> >> of
> >>>>> simplicity.
> >>>>>
> >>>>> This does not mean that I want to abandon Acero.  Acero is valuable
> >>> for a
> >>>>> number of users who don't need that last 20% of performance and would
> >>>>> rather not introduce a new library.  Acero has been a valuable
> >> building
> >>>>> block for those that are exploring unique execution models or whose
> >>>>> workloads don't cleanly fit into an SQL query.  Acero has been used
> >>>>> effectively for academic research.  Acero has been valuable for me
> >>>>> personally as a sort of "reference implementation" for a Substrait
> >>>> consumer
> >>>>> as well as being a reference engine for connectivity and
> >>> decentralization
> >>>>> in general.
> >>>>>
> >>>>> ## My roadmap
> >>>>>
> >>>>> Over the next year I plan on transitioning more time into Substrait
> >>> work.
> >>>>> But this is less because I am abandoning Acero and more because I
> >> would
> >>>>> like to start wrapping Acero up.  In my mind, Acero as an "extensible
> >>>>> streaming execution engine" is very nearly "complete" (as much as
> >>>> anything
> >>>>> is ever complete).
> >>>>>
> >>>>> 1. One significant remaining challenge is getting some better tools
> >> in
> >>>>> place for reducing runtime memory usage.  This mostly equates to
> >> being
> >>>>> smarter about scanning (in particular how we scan large row groups)
> >> and
> >>>>> adding support for spilling to pipeline breakers (there is a
> >> promising
> >>> PR
> >>>>> for this that I have not yet been able to get around to).  I would
> >> like
> >>>> to
> >>>>> find time to address these things over the next year.
> >>>>>
> >>>>> 2. I would like Acero to be better documented and more extensible.
> >> It
> >>>>> should be relatively simple (and hopefully as foolproof as possible)
> >>> for
> >>>>> users to create their own extension nodes.  Perhaps we could even
> >>> support
> >>>>> python extension nodes.  There has been some promising work around
> >>>>> Substrait extension nodes which I think could be generalized to allow
> >>>>> extension node developers to use Substrait without having to create
> >>>> .proto
> >>>>> files.
> >>>>>
> >>>>> 3. Finally, pyarrow is a toolbox.  I would like to see some of the
> >>>> internal
> >>>>> compute utilities exposed as their own tools (and pyarrow bindings
> >>>> added).
> >>>>> Significantly (though I don't think I'll get to all of these):
> >>>>>
> >>>>>    * The ExecBatchBuilder is a useful accumulation tool.  It could be
> >>>> used,
> >>>>> for example, to load datasets into pandas that are almost as big as
> >> RAM
> >>>>> (today you typically need at least 2x memory to convert to pandas).
> >>>>>    * The GroupingSegmenter could be used to support workflows like
> >>> "Group
> >>>> by
> >>>>> X and then give me a pandas dataframe for each group".
> >>>>>    * Whatever utilities we develop for spilling could be useful as
> >>>> temporary
> >>>>> caches.
> >>>>>    * There is an entire row based encoding and hash table built in
> >> there
> >>>>> somewhere.
> >>>>>
> >>>>> There are also a few things that I would love to see added but I
> >> don't
> >>>>> expect to be able to get to it myself anytime soon.  If anyone is
> >>>>> interested feel free to reach out and I'd be happy to brainstorm
> >>>>> implementation.  Off the top of my head:
> >>>>>
> >>>>>    * Support for window functions (for those of you that are not SQL
> >>>> insiders
> >>>>> this means "functions that rely on row order", like cumulative sum,
> >>> rank,
> >>>>> or lag)
> >>>>>      * We have most of the basic building blocks and so a relatively
> >>> naive
> >>>>> implementation shouldn't be a huge stretch.
> >>>>>    * Support for a streaming merge join (e.g. if the keys are ordered
> >> on
> >>>> both
> >>>>> inputs you don't have to accumulate all of one input)
> >>>>>
> >>>>> I welcome any input and acknowledge that any or all of this could
> >>> change
> >>>>> completely in the next 3 months.
> >>>>>

Reply via email to