+1 for me.  What you are describing is a good idea and having
different ways to provide sources, especially through Substrait and
Python, is something we can very much use right away.  I think we are
probably pretty close to having the components you describe.  I'm not
quite sure I follow all of the details but those things can be worked
out in PRs.

On Fri, Jun 24, 2022 at 12:51 AM Yaron Gvili <rt...@hotmail.com> wrote:
>
> I'm using schema-carrying tabular data as a more general term than a 
> RecordBatch stream. For example, an ExecBatch stream or a vector of 
> equal-length Arrays plus associated schema is also good. The Python 
> data-source node would be responsible for converting from the Python 
> data-source function to invocations of InputReceived on the node's output - 
> the SourceNode and TableSourceNode classes in source_node.cc are examples of 
> this structure. Over time, the Python data-source node could extend its 
> support to various Python data-source interfaces.
>
> > A very similar interface is the RecordBatchReader
>
> The PyArrow version of this 
> (https://kou.github.io/arrow-site/docs/python/generated/pyarrow.RecordBatchReader.html)
>  is one Python data-source interface that makes sense to support. Since the 
> overall goal of my project is to support existing Python-based data-sources 
> that were not necessarily designed for PyArrow, some adapters would surely be 
> needed too.
>
> > I think a source node that wraps a RecordBatchReader would be a great idea, 
> > we probably have something similar to this
>
> Indeed, TableSourceNode in source_node.cc seems to be similar enough - it 
> converts a Table to an ExecBatch generator, though it consumes eagerly. A 
> RecordBatchReader would need to be consumed lazily.
>
> > Would a python adapter to RecordBatchReader be sufficient?  Or is something 
> > different?
>
> As noted above, this is one interface that makes sense to support.
>
> > How is cancellation handled?  For example, the user gives up and cancels 
> > the query early. (Acero doesn't handle cancellation well at the moment but 
> > I'm hoping to work on that and cancellable sources is an important piece)?
>
> This question is more about the implementation of (an adapter for) a 
> Python-based data source than about its integration, which is the focus of 
> the design I'm proposing. Taking a stab at this question, I think one 
> solution involves using a sorting-queue (by timestamp or index, where 
> insertions cover disjoint intervals) to coordinate parallel producing threads 
> and the (source node's) consumer thread. Accordingly, a batch is allowed to 
> be fetched only when it is current (i.e., when it is clear no batch coming 
> before it could be inserted later) and cancellation is done by disabling the 
> queue (e.g., making it return a suitable error upon insertion).
>
> > Can the function be called reentrantly?  In other words, can we call the 
> > function before the previous call finishes if we want to read the source in 
> > parallel?
>
> IIUC, you have in mind a ReadNext kind of function. The design does not 
> specify that the Python-based data-source must have this interface. If the 
> Python-based data-source is only sequentially accessible, then it makes sense 
> to write a RecordBatchReader (or an ExecBatch reader) adapter for it that has 
> this kind of ReadNext function, and then the reentrancy problem is mot since 
> no parallel-access occurs. OTOH, if the Python-based data-source can be 
> accessed in parallel, the above sorting-queue solution is better suited and 
> would avoid the reentrancy problem of a ReadNext function.
>
>
> Yaron.
> ________________________________
> From: Weston Pace <weston.p...@gmail.com>
> Sent: Thursday, June 23, 2022 8:21 PM
> To: dev@arrow.apache.org <dev@arrow.apache.org>
> Subject: Re: user-defined Python-based data-sources in Arrow
>
> This seems reasonable to me.  A very similar interface is the
> RecordBatchReader[1] which is roughly (glossing over details)...
>
> ```
> class RecordBatchReader {
>   virtual std::shared_ptr<Schema> schema() const = 0;
>   virtual Result<std::shared_ptr<RecordBatch>> Next() = 0;
>   virtual Status Close() = 0;
> };
> ```
>
> This seems pretty close to what you are describing.  I think a source
> node that wraps a RecordBatchReader would be a great idea, we probably
> have something similar to this.  So my questions would be:
>
>  * Would a python adapter to RecordBatchReader be sufficient?  Or is
> something different?
>  * How is cancellation handled?  For example, the user gives up and
> cancels the query early. (Acero doesn't handle cancellation well at
> the moment but I'm hoping to work on that and cancellable sources is
> an important piece)?
>  * Can the function be called reentrantly?  In other words, can we
> call the function before the previous call finishes if we want to read
> the source in parallel?
>
> [1] 
> https://github.com/apache/arrow/blob/86915807af6fe10f44bc881e57b2f425f97c56c7/cpp/src/arrow/record_batch.h#L219
>
> On Wed, Jun 22, 2022 at 9:47 AM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > Sure, it can be found at 
> > https://lists.apache.org/thread/o2nc7jnmfpt8lhcnjths1gnzvy86yfxo . Compared 
> > to this thread, the design proposed here is more mature, now that I have a 
> > reasonable version of the Ibis and Ibis-Substrait parts implemented locally 
> > (if it helps this discussion, I could provide some details about this 
> > implementation). I no longer propose registering the data-source function 
> > nor using arrow::compute::Function for it, since it would be directly added 
> > to a source execution node, be it manually or via deserialization of a 
> > Substrait plan. Also, I now define the data-source function as producing 
> > schema-carrying tabular data.
> >
> >
> > Yaron.
> > ________________________________
> > From: Li Jin <ice.xell...@gmail.com>
> > Sent: Wednesday, June 22, 2022 2:50 PM
> > To: dev@arrow.apache.org <dev@arrow.apache.org>
> > Subject: Re: user-defined Python-based data-sources in Arrow
> >
> > Yaron,
> >
> > Do you mind also linking the previous mailing list discussion here?
> >
> > On Wed, Jun 22, 2022 at 11:40 AM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > > Hi,
> > >
> > > I'd like to get the community's feedback about a design proposal
> > > (discussed below) for integrating user-defined Python-based data-sources 
> > > in
> > > Arrow. This is part of a larger project I'm working on to provide
> > > end-to-end (Ibis/Ibis-Substrait/Arrow) support for such data-sources.
> > >
> > > A user-defined Python-based data-source is basically a function
> > > implemented in Python that takes no arguments and returns schema-carrying
> > > tabular data, e.g., a dataframe or a record-batch stream, as well as
> > > exposes the schema. Normally, such a function would be generated by a
> > > factory-function that does take arguments to embed them (or values derived
> > > from them) in the returned data-source function. The data-source function
> > > is intended to be integrated within an input execution node of an Acero
> > > execution plan.
> > >
> > > This suggests distinguishing between a couple of data-source roles:
> > >
> > >   *   Author: the person/component implementing the data-source factory
> > > function
> > >   *   Producer: the person/component creating a specific data-source
> > > function
> > >   *   Consumer: the person/component sourcing data using the specific
> > > data-source function
> > >
> > > In an end-to-end scenario (whose design details I'm leaving out here),
> > > authoring would be done using Python, producing using Ibis, serialization
> > > using Ibis-Substrait, and consuming using PyArrow+Acero.
> > >
> > > In Arrow, the integration of a user-defined data-source would involve
> > > these steps:
> > >
> > >   *   A data-source function is obtained, either as an argument to a
> > > PyArrow API or by deserializing from a Substrait plan in which it is
> > > encoded (I have this encoding of Python functions working locally)
> > >   *   A data-source function is wrapped using Cython (similar to Python
> > > scalar UDFs - see https://github.com/apache/arrow/pull/12590) and held by
> > > an input execution node implemented in C++
> > >   *   One or more such input execution nodes are created as part of
> > > assembling an Acero execution plan
> > >   *   Each input execution node uses the data-source function it holds to
> > >      *   expose via Acero APIs the schema of the data-source function
> > >      *   source data and convert it to record-batches that are pushed on
> > > to the next node in the plan
> > >
> > > Yaron.
> > >

Reply via email to