Thanks for the detailed overview, Weston. I agree with David this would be very useful to have in a public doc.
Weston and David's discussion is a good one, however, I see it as separate from the discussion I brought up. The former is about facilities (like extension points) for implementing custom data sources in Arrow whereas the latter is about facilities for integrating in PyArrow (existing or future) data sources written/wrapped in Python. In this latter discussion, I'm indifferent to the complexities of data source implementation. I'm especially interested in feedback about the new function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed, as well as possible alternatives to these, and more generally in reaching consensus about how a custom data-source written/wrapped in Python would get integrated. > > At the moment as we > are not exposing the execution engine primitives to Python user, are you > expecting to expose them by this approach. > > From our side, these APIs are not directly exposed to the end user, but > rather, primitives that allow us to build on top of. For clarity of discussion, I'd suggest distinguishing between a data-source-integrator and an Acero-user (or end-user), since in many use cases these are not the same person. When I wrote user, I meant a data-source-integrator. An Acero-user would not be directly using the facilities I proposed. Yaron. ________________________________ From: David Li <lidav...@apache.org> Sent: Friday, June 3, 2022 5:53 PM To: dev@arrow.apache.org <dev@arrow.apache.org> Subject: Re: data-source UDFs Thanks for the overview of the different extension points, it's nice to see this laid out. (It would be great to find a place in the docs for this, IMO, or possibly as a blog post?) Just to chime in quickly here: For databases/Flight, my hope is that integrating ADBC into Arrow Datasets will take care of both. Plain Flight isn't quite well-defined enough to be meaningfully integrated (except perhaps via a generic "stream of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an ExecPlan, we'd have to do some work that would look roughly like writing an ADBC driver, so we may as well go that route. -David On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote: > Efficiently reading from a data source is something that has a bit of > complexity (parsing files, connecting to remote data sources, managing > parallel reads, etc.) Ideally we don't want users to have to reinvent > these things as they go. The datasets module in Arrow-C++ has a lot > of code here already. > > So I think there is probably more than one extension point. Some of > these extension points already exist. I do believe there is > opportunity to create further extension points as well and the > challenge / opportunity here will be figuring out what those are and > what their API should be. > > ## I can describe a little bit about what we have already: > > * Filesystem abstraction > > Right now we have a filesystem abstraction (arrow::fs::FileSystem) > which is pretty well documented and straightforward. This is how we > can swap between local disk, S3, etc. From an Acero / datasets > perspective the API is basically "given a path, give me a stream of > bytes" (open file) and "given a path, give me a list of files" (list > directory). > > * FileFormat abstraction > > The file format abstraction (arrow::dataset::FileFormat) is how we > swap out different kinds of files. For example, > arrow/orc/parquet/csv/json/... The API is roughly (glossing over > plenty of details) > > - Convert input file to schema (inspect) > - Convert input file to a stream of batches (scan) > - Convert a stream of batches to an output file (write) > > * Fragment / Dataset abstraction > > The fragment (arrow::dataset::Fragment) & dataset > (arrow::dataset::Dataset) APIs are how we describe a collection of > files. This is used by the scanner to implement parallel reads. You > can think of these as the "source" API. The APIs are roughly > > - Convert a dataset to a stream fragments (list dataset) > - Convert a fragment to a stream of batches (scan) > > The two main implementations of datasets that we have today are > FilesystemDataset (uses a filesystem to list files, each file is a > fragment. A filesystem fragment uses a format to convert its file to > a stream of batches) and the InMemoryDataset (there is one fragment > and the scan operation is just slicing off pieces of the in-memory > data). There are also some niche implementations here like a dataset > that is created from a python iterable of batches. This might be very > similar to what you are describing above. > > A dataset must be created with a "dataset schema" which is the single > schema that all fragments of the dataset can be converted to. > > * Custom source nodes > > All of the above is exposed to Acero via the scan node which is > responsible for turning a dataset into Acero input. However, the > datasets API could be bypassed entirely to feed Acero in other ways. > Some examples: > > - The table source node is a way to feed in-memory data (a table) > into Acero. This is very similar to the InMemoryDataset but bypasses > some of the overhead of the scanner. > - The TCP-H source node generates random data for benchmarking purposes. > > A lot of things can be expressed both as a simple dataset or a custom > source node. There is a bit of duplication here and I don't know that > it matters too much. I'm just pointing this out for pedantic > purposes. > > The API here is just the ExecNode API and so the user needs to provide > something that starts when StartProducing is called and then calls > InputReceived on a regular basis. From the discussions on the > scheduler I suspect this API may be changing slightly but the idea is > still there. > > ## I'm also aware of a number of things we are still going to need at > some point. > > * Evolution > > Sometimes different files in a dataset have different schemas. A very > common case is fields getting added over time or fields getting > renamed or changing data type (e.g. int32 -> int64). We have some > support for the former but none for the latter. I've got a pretty > good idea of what the API looks like for "evolution" so if that is > something needed I could write that up. > > * Flight dataset/fragment/source-node? > > I don't think there will be exactly a "FlightFragment". Maybe the > right term is ADBC, but I haven't been following that discussion > closely enough. There needs to be a way to scan a remote data source > that provides its data via a standard flight service. > > * Sql dataset/fragment/source-node? > > It could be very useful to have a dataset that is capable of reading > data from SQL datasets via something like JDBC (although, if ADBC > connectors get built quickly enough, maybe this is never needed :) > > * Catalogs > > The filesystem dataset currently figures out the dataset schema > through a rather expensive inspection process and it lists its > fragments using potentially expensive directory listing. Metadata > catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways > of storing precomputed versions of this information. A dataset that > is capable of figuring out what files to scan from a catalog would be > valuable. This dataset might use the same filesystem fragment to do > the actual scan. > > * Table metadata > > Very similar to the catalogs discussion is the idea of "table > metadata". This is less about reading data and more about describing > the data. For example, metadata about any ordering of the incoming > data, unique constraints, not-null constraints, etc. All of this > information can be used by exec nodes to simplify query processing. > For example, if you are grouping on a set of keys and one of the keys > is ordered then you implement group by with a streaming (not pipeline > breaking) implementation. > >> that allows utilizing existing Python APIs that knows how to read data >> source as a stream of record batches. > > We have a class called arrow::dataset::<unnamed>::OneShotFragment > which knows how to convert a python iterator into a scannable source. > This might serve your needs. This was also written when things were > more dataset-oriented. It might also be interesting to create a > python source node which does the same sort of thing, bypassing the > scanner, although I don't know that there would be much concrete > benefit. > > I hope this information is helpful! It is just background though. I > think I might need to understand your needs in a bit more detail > before I can offer any kind of prescriptive advice. > > On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ice.xell...@gmail.com> wrote: >> >> Actually, "UDF" might be the wrong terminology here - This is more of a >> "custom Python data source" than "Python user defined functions". (Although >> under the hood it can probably reuse lots of the UDF logic to execute the >> custom data source) >> >> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ice.xell...@gmail.com> wrote: >> >> > What Yaron is going for is really something similar to custom data source >> > in Spark ( >> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a) >> > that allows utilizing existing Python APIs that knows how to read data >> > source as a stream of record batches. >> > >> > >> >