Re: user-defined Python-based data-sources in Arrow
Yaron, Do you mind also linking the previous mailing list discussion here? On Wed, Jun 22, 2022 at 11:40 AM Yaron Gvili wrote: > Hi, > > I'd like to get the community's feedback about a design proposal > (discussed below) for integrating user-defined Python-based data-sources in > Arrow. This is part of a larger project I'm working on to provide > end-to-end (Ibis/Ibis-Substrait/Arrow) support for such data-sources. > > A user-defined Python-based data-source is basically a function > implemented in Python that takes no arguments and returns schema-carrying > tabular data, e.g., a dataframe or a record-batch stream, as well as > exposes the schema. Normally, such a function would be generated by a > factory-function that does take arguments to embed them (or values derived > from them) in the returned data-source function. The data-source function > is intended to be integrated within an input execution node of an Acero > execution plan. > > This suggests distinguishing between a couple of data-source roles: > > * Author: the person/component implementing the data-source factory > function > * Producer: the person/component creating a specific data-source > function > * Consumer: the person/component sourcing data using the specific > data-source function > > In an end-to-end scenario (whose design details I'm leaving out here), > authoring would be done using Python, producing using Ibis, serialization > using Ibis-Substrait, and consuming using PyArrow+Acero. > > In Arrow, the integration of a user-defined data-source would involve > these steps: > > * A data-source function is obtained, either as an argument to a > PyArrow API or by deserializing from a Substrait plan in which it is > encoded (I have this encoding of Python functions working locally) > * A data-source function is wrapped using Cython (similar to Python > scalar UDFs - see https://github.com/apache/arrow/pull/12590) and held by > an input execution node implemented in C++ > * One or more such input execution nodes are created as part of > assembling an Acero execution plan > * Each input execution node uses the data-source function it holds to > * expose via Acero APIs the schema of the data-source function > * source data and convert it to record-batches that are pushed on > to the next node in the plan > > Yaron. >
Re: user-defined Python-based data-sources in Arrow
Sure, it can be found at https://lists.apache.org/thread/o2nc7jnmfpt8lhcnjths1gnzvy86yfxo . Compared to this thread, the design proposed here is more mature, now that I have a reasonable version of the Ibis and Ibis-Substrait parts implemented locally (if it helps this discussion, I could provide some details about this implementation). I no longer propose registering the data-source function nor using arrow::compute::Function for it, since it would be directly added to a source execution node, be it manually or via deserialization of a Substrait plan. Also, I now define the data-source function as producing schema-carrying tabular data. Yaron. From: Li Jin Sent: Wednesday, June 22, 2022 2:50 PM To: dev@arrow.apache.org Subject: Re: user-defined Python-based data-sources in Arrow Yaron, Do you mind also linking the previous mailing list discussion here? On Wed, Jun 22, 2022 at 11:40 AM Yaron Gvili wrote: > Hi, > > I'd like to get the community's feedback about a design proposal > (discussed below) for integrating user-defined Python-based data-sources in > Arrow. This is part of a larger project I'm working on to provide > end-to-end (Ibis/Ibis-Substrait/Arrow) support for such data-sources. > > A user-defined Python-based data-source is basically a function > implemented in Python that takes no arguments and returns schema-carrying > tabular data, e.g., a dataframe or a record-batch stream, as well as > exposes the schema. Normally, such a function would be generated by a > factory-function that does take arguments to embed them (or values derived > from them) in the returned data-source function. The data-source function > is intended to be integrated within an input execution node of an Acero > execution plan. > > This suggests distinguishing between a couple of data-source roles: > > * Author: the person/component implementing the data-source factory > function > * Producer: the person/component creating a specific data-source > function > * Consumer: the person/component sourcing data using the specific > data-source function > > In an end-to-end scenario (whose design details I'm leaving out here), > authoring would be done using Python, producing using Ibis, serialization > using Ibis-Substrait, and consuming using PyArrow+Acero. > > In Arrow, the integration of a user-defined data-source would involve > these steps: > > * A data-source function is obtained, either as an argument to a > PyArrow API or by deserializing from a Substrait plan in which it is > encoded (I have this encoding of Python functions working locally) > * A data-source function is wrapped using Cython (similar to Python > scalar UDFs - see https://github.com/apache/arrow/pull/12590) and held by > an input execution node implemented in C++ > * One or more such input execution nodes are created as part of > assembling an Acero execution plan > * Each input execution node uses the data-source function it holds to > * expose via Acero APIs the schema of the data-source function > * source data and convert it to record-batches that are pushed on > to the next node in the plan > > Yaron. >
Re: user-defined Python-based data-sources in Arrow
This seems reasonable to me. A very similar interface is the RecordBatchReader[1] which is roughly (glossing over details)... ``` class RecordBatchReader { virtual std::shared_ptr schema() const = 0; virtual Result> Next() = 0; virtual Status Close() = 0; }; ``` This seems pretty close to what you are describing. I think a source node that wraps a RecordBatchReader would be a great idea, we probably have something similar to this. So my questions would be: * Would a python adapter to RecordBatchReader be sufficient? Or is something different? * How is cancellation handled? For example, the user gives up and cancels the query early. (Acero doesn't handle cancellation well at the moment but I'm hoping to work on that and cancellable sources is an important piece)? * Can the function be called reentrantly? In other words, can we call the function before the previous call finishes if we want to read the source in parallel? [1] https://github.com/apache/arrow/blob/86915807af6fe10f44bc881e57b2f425f97c56c7/cpp/src/arrow/record_batch.h#L219 On Wed, Jun 22, 2022 at 9:47 AM Yaron Gvili wrote: > > Sure, it can be found at > https://lists.apache.org/thread/o2nc7jnmfpt8lhcnjths1gnzvy86yfxo . Compared > to this thread, the design proposed here is more mature, now that I have a > reasonable version of the Ibis and Ibis-Substrait parts implemented locally > (if it helps this discussion, I could provide some details about this > implementation). I no longer propose registering the data-source function nor > using arrow::compute::Function for it, since it would be directly added to a > source execution node, be it manually or via deserialization of a Substrait > plan. Also, I now define the data-source function as producing > schema-carrying tabular data. > > > Yaron. > > From: Li Jin > Sent: Wednesday, June 22, 2022 2:50 PM > To: dev@arrow.apache.org > Subject: Re: user-defined Python-based data-sources in Arrow > > Yaron, > > Do you mind also linking the previous mailing list discussion here? > > On Wed, Jun 22, 2022 at 11:40 AM Yaron Gvili wrote: > > > Hi, > > > > I'd like to get the community's feedback about a design proposal > > (discussed below) for integrating user-defined Python-based data-sources in > > Arrow. This is part of a larger project I'm working on to provide > > end-to-end (Ibis/Ibis-Substrait/Arrow) support for such data-sources. > > > > A user-defined Python-based data-source is basically a function > > implemented in Python that takes no arguments and returns schema-carrying > > tabular data, e.g., a dataframe or a record-batch stream, as well as > > exposes the schema. Normally, such a function would be generated by a > > factory-function that does take arguments to embed them (or values derived > > from them) in the returned data-source function. The data-source function > > is intended to be integrated within an input execution node of an Acero > > execution plan. > > > > This suggests distinguishing between a couple of data-source roles: > > > > * Author: the person/component implementing the data-source factory > > function > > * Producer: the person/component creating a specific data-source > > function > > * Consumer: the person/component sourcing data using the specific > > data-source function > > > > In an end-to-end scenario (whose design details I'm leaving out here), > > authoring would be done using Python, producing using Ibis, serialization > > using Ibis-Substrait, and consuming using PyArrow+Acero. > > > > In Arrow, the integration of a user-defined data-source would involve > > these steps: > > > > * A data-source function is obtained, either as an argument to a > > PyArrow API or by deserializing from a Substrait plan in which it is > > encoded (I have this encoding of Python functions working locally) > > * A data-source function is wrapped using Cython (similar to Python > > scalar UDFs - see https://github.com/apache/arrow/pull/12590) and held by > > an input execution node implemented in C++ > > * One or more such input execution nodes are created as part of > > assembling an Acero execution plan > > * Each input execution node uses the data-source function it holds to > > * expose via Acero APIs the schema of the data-source function > > * source data and convert it to record-batches that are pushed on > > to the next node in the plan > > > > Yaron. > >
Re: user-defined Python-based data-sources in Arrow
I'm using schema-carrying tabular data as a more general term than a RecordBatch stream. For example, an ExecBatch stream or a vector of equal-length Arrays plus associated schema is also good. The Python data-source node would be responsible for converting from the Python data-source function to invocations of InputReceived on the node's output - the SourceNode and TableSourceNode classes in source_node.cc are examples of this structure. Over time, the Python data-source node could extend its support to various Python data-source interfaces. > A very similar interface is the RecordBatchReader The PyArrow version of this (https://kou.github.io/arrow-site/docs/python/generated/pyarrow.RecordBatchReader.html) is one Python data-source interface that makes sense to support. Since the overall goal of my project is to support existing Python-based data-sources that were not necessarily designed for PyArrow, some adapters would surely be needed too. > I think a source node that wraps a RecordBatchReader would be a great idea, > we probably have something similar to this Indeed, TableSourceNode in source_node.cc seems to be similar enough - it converts a Table to an ExecBatch generator, though it consumes eagerly. A RecordBatchReader would need to be consumed lazily. > Would a python adapter to RecordBatchReader be sufficient? Or is something > different? As noted above, this is one interface that makes sense to support. > How is cancellation handled? For example, the user gives up and cancels the > query early. (Acero doesn't handle cancellation well at the moment but I'm > hoping to work on that and cancellable sources is an important piece)? This question is more about the implementation of (an adapter for) a Python-based data source than about its integration, which is the focus of the design I'm proposing. Taking a stab at this question, I think one solution involves using a sorting-queue (by timestamp or index, where insertions cover disjoint intervals) to coordinate parallel producing threads and the (source node's) consumer thread. Accordingly, a batch is allowed to be fetched only when it is current (i.e., when it is clear no batch coming before it could be inserted later) and cancellation is done by disabling the queue (e.g., making it return a suitable error upon insertion). > Can the function be called reentrantly? In other words, can we call the > function before the previous call finishes if we want to read the source in > parallel? IIUC, you have in mind a ReadNext kind of function. The design does not specify that the Python-based data-source must have this interface. If the Python-based data-source is only sequentially accessible, then it makes sense to write a RecordBatchReader (or an ExecBatch reader) adapter for it that has this kind of ReadNext function, and then the reentrancy problem is mot since no parallel-access occurs. OTOH, if the Python-based data-source can be accessed in parallel, the above sorting-queue solution is better suited and would avoid the reentrancy problem of a ReadNext function. Yaron. From: Weston Pace Sent: Thursday, June 23, 2022 8:21 PM To: dev@arrow.apache.org Subject: Re: user-defined Python-based data-sources in Arrow This seems reasonable to me. A very similar interface is the RecordBatchReader[1] which is roughly (glossing over details)... ``` class RecordBatchReader { virtual std::shared_ptr schema() const = 0; virtual Result> Next() = 0; virtual Status Close() = 0; }; ``` This seems pretty close to what you are describing. I think a source node that wraps a RecordBatchReader would be a great idea, we probably have something similar to this. So my questions would be: * Would a python adapter to RecordBatchReader be sufficient? Or is something different? * How is cancellation handled? For example, the user gives up and cancels the query early. (Acero doesn't handle cancellation well at the moment but I'm hoping to work on that and cancellable sources is an important piece)? * Can the function be called reentrantly? In other words, can we call the function before the previous call finishes if we want to read the source in parallel? [1] https://github.com/apache/arrow/blob/86915807af6fe10f44bc881e57b2f425f97c56c7/cpp/src/arrow/record_batch.h#L219 On Wed, Jun 22, 2022 at 9:47 AM Yaron Gvili wrote: > > Sure, it can be found at > https://lists.apache.org/thread/o2nc7jnmfpt8lhcnjths1gnzvy86yfxo . Compared > to this thread, the design proposed here is more mature, now that I have a > reasonable version of the Ibis and Ibis-Substrait parts implemented locally > (if it helps this discussion, I could provide some details about this > implementation). I no longer propose registering the data-source function nor > using arrow::compute::Function for it, since
Re: user-defined Python-based data-sources in Arrow
+1 for me. What you are describing is a good idea and having different ways to provide sources, especially through Substrait and Python, is something we can very much use right away. I think we are probably pretty close to having the components you describe. I'm not quite sure I follow all of the details but those things can be worked out in PRs. On Fri, Jun 24, 2022 at 12:51 AM Yaron Gvili wrote: > > I'm using schema-carrying tabular data as a more general term than a > RecordBatch stream. For example, an ExecBatch stream or a vector of > equal-length Arrays plus associated schema is also good. The Python > data-source node would be responsible for converting from the Python > data-source function to invocations of InputReceived on the node's output - > the SourceNode and TableSourceNode classes in source_node.cc are examples of > this structure. Over time, the Python data-source node could extend its > support to various Python data-source interfaces. > > > A very similar interface is the RecordBatchReader > > The PyArrow version of this > (https://kou.github.io/arrow-site/docs/python/generated/pyarrow.RecordBatchReader.html) > is one Python data-source interface that makes sense to support. Since the > overall goal of my project is to support existing Python-based data-sources > that were not necessarily designed for PyArrow, some adapters would surely be > needed too. > > > I think a source node that wraps a RecordBatchReader would be a great idea, > > we probably have something similar to this > > Indeed, TableSourceNode in source_node.cc seems to be similar enough - it > converts a Table to an ExecBatch generator, though it consumes eagerly. A > RecordBatchReader would need to be consumed lazily. > > > Would a python adapter to RecordBatchReader be sufficient? Or is something > > different? > > As noted above, this is one interface that makes sense to support. > > > How is cancellation handled? For example, the user gives up and cancels > > the query early. (Acero doesn't handle cancellation well at the moment but > > I'm hoping to work on that and cancellable sources is an important piece)? > > This question is more about the implementation of (an adapter for) a > Python-based data source than about its integration, which is the focus of > the design I'm proposing. Taking a stab at this question, I think one > solution involves using a sorting-queue (by timestamp or index, where > insertions cover disjoint intervals) to coordinate parallel producing threads > and the (source node's) consumer thread. Accordingly, a batch is allowed to > be fetched only when it is current (i.e., when it is clear no batch coming > before it could be inserted later) and cancellation is done by disabling the > queue (e.g., making it return a suitable error upon insertion). > > > Can the function be called reentrantly? In other words, can we call the > > function before the previous call finishes if we want to read the source in > > parallel? > > IIUC, you have in mind a ReadNext kind of function. The design does not > specify that the Python-based data-source must have this interface. If the > Python-based data-source is only sequentially accessible, then it makes sense > to write a RecordBatchReader (or an ExecBatch reader) adapter for it that has > this kind of ReadNext function, and then the reentrancy problem is mot since > no parallel-access occurs. OTOH, if the Python-based data-source can be > accessed in parallel, the above sorting-queue solution is better suited and > would avoid the reentrancy problem of a ReadNext function. > > > Yaron. > > From: Weston Pace > Sent: Thursday, June 23, 2022 8:21 PM > To: dev@arrow.apache.org > Subject: Re: user-defined Python-based data-sources in Arrow > > This seems reasonable to me. A very similar interface is the > RecordBatchReader[1] which is roughly (glossing over details)... > > ``` > class RecordBatchReader { > virtual std::shared_ptr schema() const = 0; > virtual Result> Next() = 0; > virtual Status Close() = 0; > }; > ``` > > This seems pretty close to what you are describing. I think a source > node that wraps a RecordBatchReader would be a great idea, we probably > have something similar to this. So my questions would be: > > * Would a python adapter to RecordBatchReader be sufficient? Or is > something different? > * How is cancellation handled? For example, the user gives up and > cancels the query early. (Acero doesn't handle cancellation well at > the moment but I'm hoping to work on that and cancellable sources is > an important piec