I'm using schema-carrying tabular data as a more general term than a 
RecordBatch stream. For example, an ExecBatch stream or a vector of 
equal-length Arrays plus associated schema is also good. The Python data-source 
node would be responsible for converting from the Python data-source function 
to invocations of InputReceived on the node's output - the SourceNode and 
TableSourceNode classes in source_node.cc are examples of this structure. Over 
time, the Python data-source node could extend its support to various Python 
data-source interfaces.

> A very similar interface is the RecordBatchReader

The PyArrow version of this 
(https://kou.github.io/arrow-site/docs/python/generated/pyarrow.RecordBatchReader.html)
 is one Python data-source interface that makes sense to support. Since the 
overall goal of my project is to support existing Python-based data-sources 
that were not necessarily designed for PyArrow, some adapters would surely be 
needed too.

> I think a source node that wraps a RecordBatchReader would be a great idea, 
> we probably have something similar to this

Indeed, TableSourceNode in source_node.cc seems to be similar enough - it 
converts a Table to an ExecBatch generator, though it consumes eagerly. A 
RecordBatchReader would need to be consumed lazily.

> Would a python adapter to RecordBatchReader be sufficient?  Or is something 
> different?

As noted above, this is one interface that makes sense to support.

> How is cancellation handled?  For example, the user gives up and cancels the 
> query early. (Acero doesn't handle cancellation well at the moment but I'm 
> hoping to work on that and cancellable sources is an important piece)?

This question is more about the implementation of (an adapter for) a 
Python-based data source than about its integration, which is the focus of the 
design I'm proposing. Taking a stab at this question, I think one solution 
involves using a sorting-queue (by timestamp or index, where insertions cover 
disjoint intervals) to coordinate parallel producing threads and the (source 
node's) consumer thread. Accordingly, a batch is allowed to be fetched only 
when it is current (i.e., when it is clear no batch coming before it could be 
inserted later) and cancellation is done by disabling the queue (e.g., making 
it return a suitable error upon insertion).

> Can the function be called reentrantly?  In other words, can we call the 
> function before the previous call finishes if we want to read the source in 
> parallel?

IIUC, you have in mind a ReadNext kind of function. The design does not specify 
that the Python-based data-source must have this interface. If the Python-based 
data-source is only sequentially accessible, then it makes sense to write a 
RecordBatchReader (or an ExecBatch reader) adapter for it that has this kind of 
ReadNext function, and then the reentrancy problem is mot since no 
parallel-access occurs. OTOH, if the Python-based data-source can be accessed 
in parallel, the above sorting-queue solution is better suited and would avoid 
the reentrancy problem of a ReadNext function.


Yaron.
________________________________
From: Weston Pace <weston.p...@gmail.com>
Sent: Thursday, June 23, 2022 8:21 PM
To: dev@arrow.apache.org <dev@arrow.apache.org>
Subject: Re: user-defined Python-based data-sources in Arrow

This seems reasonable to me.  A very similar interface is the
RecordBatchReader[1] which is roughly (glossing over details)...

```
class RecordBatchReader {
  virtual std::shared_ptr<Schema> schema() const = 0;
  virtual Result<std::shared_ptr<RecordBatch>> Next() = 0;
  virtual Status Close() = 0;
};
```

This seems pretty close to what you are describing.  I think a source
node that wraps a RecordBatchReader would be a great idea, we probably
have something similar to this.  So my questions would be:

 * Would a python adapter to RecordBatchReader be sufficient?  Or is
something different?
 * How is cancellation handled?  For example, the user gives up and
cancels the query early. (Acero doesn't handle cancellation well at
the moment but I'm hoping to work on that and cancellable sources is
an important piece)?
 * Can the function be called reentrantly?  In other words, can we
call the function before the previous call finishes if we want to read
the source in parallel?

[1] 
https://github.com/apache/arrow/blob/86915807af6fe10f44bc881e57b2f425f97c56c7/cpp/src/arrow/record_batch.h#L219

On Wed, Jun 22, 2022 at 9:47 AM Yaron Gvili <rt...@hotmail.com> wrote:
>
> Sure, it can be found at 
> https://lists.apache.org/thread/o2nc7jnmfpt8lhcnjths1gnzvy86yfxo . Compared 
> to this thread, the design proposed here is more mature, now that I have a 
> reasonable version of the Ibis and Ibis-Substrait parts implemented locally 
> (if it helps this discussion, I could provide some details about this 
> implementation). I no longer propose registering the data-source function nor 
> using arrow::compute::Function for it, since it would be directly added to a 
> source execution node, be it manually or via deserialization of a Substrait 
> plan. Also, I now define the data-source function as producing 
> schema-carrying tabular data.
>
>
> Yaron.
> ________________________________
> From: Li Jin <ice.xell...@gmail.com>
> Sent: Wednesday, June 22, 2022 2:50 PM
> To: dev@arrow.apache.org <dev@arrow.apache.org>
> Subject: Re: user-defined Python-based data-sources in Arrow
>
> Yaron,
>
> Do you mind also linking the previous mailing list discussion here?
>
> On Wed, Jun 22, 2022 at 11:40 AM Yaron Gvili <rt...@hotmail.com> wrote:
>
> > Hi,
> >
> > I'd like to get the community's feedback about a design proposal
> > (discussed below) for integrating user-defined Python-based data-sources in
> > Arrow. This is part of a larger project I'm working on to provide
> > end-to-end (Ibis/Ibis-Substrait/Arrow) support for such data-sources.
> >
> > A user-defined Python-based data-source is basically a function
> > implemented in Python that takes no arguments and returns schema-carrying
> > tabular data, e.g., a dataframe or a record-batch stream, as well as
> > exposes the schema. Normally, such a function would be generated by a
> > factory-function that does take arguments to embed them (or values derived
> > from them) in the returned data-source function. The data-source function
> > is intended to be integrated within an input execution node of an Acero
> > execution plan.
> >
> > This suggests distinguishing between a couple of data-source roles:
> >
> >   *   Author: the person/component implementing the data-source factory
> > function
> >   *   Producer: the person/component creating a specific data-source
> > function
> >   *   Consumer: the person/component sourcing data using the specific
> > data-source function
> >
> > In an end-to-end scenario (whose design details I'm leaving out here),
> > authoring would be done using Python, producing using Ibis, serialization
> > using Ibis-Substrait, and consuming using PyArrow+Acero.
> >
> > In Arrow, the integration of a user-defined data-source would involve
> > these steps:
> >
> >   *   A data-source function is obtained, either as an argument to a
> > PyArrow API or by deserializing from a Substrait plan in which it is
> > encoded (I have this encoding of Python functions working locally)
> >   *   A data-source function is wrapped using Cython (similar to Python
> > scalar UDFs - see https://github.com/apache/arrow/pull/12590) and held by
> > an input execution node implemented in C++
> >   *   One or more such input execution nodes are created as part of
> > assembling an Acero execution plan
> >   *   Each input execution node uses the data-source function it holds to
> >      *   expose via Acero APIs the schema of the data-source function
> >      *   source data and convert it to record-batches that are pushed on
> > to the next node in the plan
> >
> > Yaron.
> >

Reply via email to