+1 for me. What you are describing is a good idea and having different ways to provide sources, especially through Substrait and Python, is something we can very much use right away. I think we are probably pretty close to having the components you describe. I'm not quite sure I follow all of the details but those things can be worked out in PRs.
On Fri, Jun 24, 2022 at 12:51 AM Yaron Gvili <rt...@hotmail.com> wrote: > > I'm using schema-carrying tabular data as a more general term than a > RecordBatch stream. For example, an ExecBatch stream or a vector of > equal-length Arrays plus associated schema is also good. The Python > data-source node would be responsible for converting from the Python > data-source function to invocations of InputReceived on the node's output - > the SourceNode and TableSourceNode classes in source_node.cc are examples of > this structure. Over time, the Python data-source node could extend its > support to various Python data-source interfaces. > > > A very similar interface is the RecordBatchReader > > The PyArrow version of this > (https://kou.github.io/arrow-site/docs/python/generated/pyarrow.RecordBatchReader.html) > is one Python data-source interface that makes sense to support. Since the > overall goal of my project is to support existing Python-based data-sources > that were not necessarily designed for PyArrow, some adapters would surely be > needed too. > > > I think a source node that wraps a RecordBatchReader would be a great idea, > > we probably have something similar to this > > Indeed, TableSourceNode in source_node.cc seems to be similar enough - it > converts a Table to an ExecBatch generator, though it consumes eagerly. A > RecordBatchReader would need to be consumed lazily. > > > Would a python adapter to RecordBatchReader be sufficient? Or is something > > different? > > As noted above, this is one interface that makes sense to support. > > > How is cancellation handled? For example, the user gives up and cancels > > the query early. (Acero doesn't handle cancellation well at the moment but > > I'm hoping to work on that and cancellable sources is an important piece)? > > This question is more about the implementation of (an adapter for) a > Python-based data source than about its integration, which is the focus of > the design I'm proposing. Taking a stab at this question, I think one > solution involves using a sorting-queue (by timestamp or index, where > insertions cover disjoint intervals) to coordinate parallel producing threads > and the (source node's) consumer thread. Accordingly, a batch is allowed to > be fetched only when it is current (i.e., when it is clear no batch coming > before it could be inserted later) and cancellation is done by disabling the > queue (e.g., making it return a suitable error upon insertion). > > > Can the function be called reentrantly? In other words, can we call the > > function before the previous call finishes if we want to read the source in > > parallel? > > IIUC, you have in mind a ReadNext kind of function. The design does not > specify that the Python-based data-source must have this interface. If the > Python-based data-source is only sequentially accessible, then it makes sense > to write a RecordBatchReader (or an ExecBatch reader) adapter for it that has > this kind of ReadNext function, and then the reentrancy problem is mot since > no parallel-access occurs. OTOH, if the Python-based data-source can be > accessed in parallel, the above sorting-queue solution is better suited and > would avoid the reentrancy problem of a ReadNext function. > > > Yaron. > ________________________________ > From: Weston Pace <weston.p...@gmail.com> > Sent: Thursday, June 23, 2022 8:21 PM > To: dev@arrow.apache.org <dev@arrow.apache.org> > Subject: Re: user-defined Python-based data-sources in Arrow > > This seems reasonable to me. A very similar interface is the > RecordBatchReader[1] which is roughly (glossing over details)... > > ``` > class RecordBatchReader { > virtual std::shared_ptr<Schema> schema() const = 0; > virtual Result<std::shared_ptr<RecordBatch>> Next() = 0; > virtual Status Close() = 0; > }; > ``` > > This seems pretty close to what you are describing. I think a source > node that wraps a RecordBatchReader would be a great idea, we probably > have something similar to this. So my questions would be: > > * Would a python adapter to RecordBatchReader be sufficient? Or is > something different? > * How is cancellation handled? For example, the user gives up and > cancels the query early. (Acero doesn't handle cancellation well at > the moment but I'm hoping to work on that and cancellable sources is > an important piece)? > * Can the function be called reentrantly? In other words, can we > call the function before the previous call finishes if we want to read > the source in parallel? > > [1] > https://github.com/apache/arrow/blob/86915807af6fe10f44bc881e57b2f425f97c56c7/cpp/src/arrow/record_batch.h#L219 > > On Wed, Jun 22, 2022 at 9:47 AM Yaron Gvili <rt...@hotmail.com> wrote: > > > > Sure, it can be found at > > https://lists.apache.org/thread/o2nc7jnmfpt8lhcnjths1gnzvy86yfxo . Compared > > to this thread, the design proposed here is more mature, now that I have a > > reasonable version of the Ibis and Ibis-Substrait parts implemented locally > > (if it helps this discussion, I could provide some details about this > > implementation). I no longer propose registering the data-source function > > nor using arrow::compute::Function for it, since it would be directly added > > to a source execution node, be it manually or via deserialization of a > > Substrait plan. Also, I now define the data-source function as producing > > schema-carrying tabular data. > > > > > > Yaron. > > ________________________________ > > From: Li Jin <ice.xell...@gmail.com> > > Sent: Wednesday, June 22, 2022 2:50 PM > > To: dev@arrow.apache.org <dev@arrow.apache.org> > > Subject: Re: user-defined Python-based data-sources in Arrow > > > > Yaron, > > > > Do you mind also linking the previous mailing list discussion here? > > > > On Wed, Jun 22, 2022 at 11:40 AM Yaron Gvili <rt...@hotmail.com> wrote: > > > > > Hi, > > > > > > I'd like to get the community's feedback about a design proposal > > > (discussed below) for integrating user-defined Python-based data-sources > > > in > > > Arrow. This is part of a larger project I'm working on to provide > > > end-to-end (Ibis/Ibis-Substrait/Arrow) support for such data-sources. > > > > > > A user-defined Python-based data-source is basically a function > > > implemented in Python that takes no arguments and returns schema-carrying > > > tabular data, e.g., a dataframe or a record-batch stream, as well as > > > exposes the schema. Normally, such a function would be generated by a > > > factory-function that does take arguments to embed them (or values derived > > > from them) in the returned data-source function. The data-source function > > > is intended to be integrated within an input execution node of an Acero > > > execution plan. > > > > > > This suggests distinguishing between a couple of data-source roles: > > > > > > * Author: the person/component implementing the data-source factory > > > function > > > * Producer: the person/component creating a specific data-source > > > function > > > * Consumer: the person/component sourcing data using the specific > > > data-source function > > > > > > In an end-to-end scenario (whose design details I'm leaving out here), > > > authoring would be done using Python, producing using Ibis, serialization > > > using Ibis-Substrait, and consuming using PyArrow+Acero. > > > > > > In Arrow, the integration of a user-defined data-source would involve > > > these steps: > > > > > > * A data-source function is obtained, either as an argument to a > > > PyArrow API or by deserializing from a Substrait plan in which it is > > > encoded (I have this encoding of Python functions working locally) > > > * A data-source function is wrapped using Cython (similar to Python > > > scalar UDFs - see https://github.com/apache/arrow/pull/12590) and held by > > > an input execution node implemented in C++ > > > * One or more such input execution nodes are created as part of > > > assembling an Acero execution plan > > > * Each input execution node uses the data-source function it holds to > > > * expose via Acero APIs the schema of the data-source function > > > * source data and convert it to record-batches that are pushed on > > > to the next node in the plan > > > > > > Yaron. > > >