One more point, It would seem beneficial if we could express this in `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async buffering/coalescing would be needed. In the case of Parquet, we'd get the _exact_ ranges computed from the medata.This method would also possibly benefit other filesystems since on linux it can call `readahead` and/or `madvise`.
François On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques <[email protected]> wrote: > > Hello David, > > I think that what you ask is achievable with the dataset API without > much effort. You'd have to insert the pre-buffering at > ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is > essentially a generator that looks like > flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the > fragment in-order. The application consuming the ScanTask could > control the number of scheduled tasks by looking at the IO pool load. > > OTOH, It would be good if we could make this format agnostic, e.g. > offer this via a ScanOptions toggle, e.g. "readahead_files" and this > would be applicable to all formats, CSV, ipc, ... > > François > [1] > https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401 > > On Thu, Apr 30, 2020 at 8:20 AM David Li <[email protected]> wrote: > > > > Sure, and we are still interested in collaborating. The main use case > > we have is scanning datasets in order of the partition key; it seems > > ordering is the only missing thing from Antoine's comments. However, > > from briefly playing around with the Python API, an application could > > manually order the fragments if so desired, so that still works for > > us, even if ordering isn't otherwise a guarantee. > > > > Performance-wise, we would want intra-file concurrency (coalescing) > > and inter-file concurrency (buffering files in order, as described in > > my previous messages). Even if Datasets doesn't directly handle this, > > it'd be ideal if an application could achieve this if it were willing > > to manage the details. I also vaguely remember seeing some interest in > > things like being able to distribute a computation over a dataset via > > Dask or some other distributed computation system, which would also be > > interesting to us, though not a concrete requirement. > > > > I'd like to reference the original proposal document, which has more > > detail on our workloads and use cases: > > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit > > As described there, we have a library that implements both a > > datasets-like API (hand it a remote directory, get back an Arrow > > Table) and several optimizations to make that library perform > > acceptably. Our motivation here is to be able to have a path to > > migrate to using and contributing to Arrow Datasets, which we see as a > > cross-language, cross-filesystem library, without regressing in > > performance. (We are limited to Python and S3.) > > > > Best, > > David > > > > On 4/29/20, Wes McKinney <[email protected]> wrote: > > > On Wed, Apr 29, 2020 at 6:54 PM David Li <[email protected]> wrote: > > >> > > >> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't > > >> guaranteed to download all the files in order, but with more control, > > >> you can make this more likely. You can also prevent the case where due > > >> to scheduling, file N+1 doesn't even start downloading until after > > >> file N+2, which can happen if you just submit all reads to a thread > > >> pool, as demonstrated in the linked trace. > > >> > > >> And again, with this level of control, you can also decide to reduce > > >> or increase parallelism based on network conditions, memory usage, > > >> other readers, etc. So it is both about improving/smoothing out > > >> performance, and limiting resource consumption. > > >> > > >> Finally, I do not mean to propose that we necessarily build all of > > >> this into Arrow, just that it we would like to make it possible to > > >> build this with Arrow, and that Datasets may find this interesting for > > >> its optimization purposes, if concurrent reads are a goal. > > >> > > >> > Except that datasets are essentially unordered. > > >> > > >> I did not realize this, but that means it's not really suitable for > > >> our use case, unfortunately. > > > > > > It would be helpful to understand things a bit better so that we do > > > not miss out on an opportunity to collaborate. I don't know that the > > > current mode of the some of the public Datasets APIs is a dogmatic > > > view about how everything should always work, and it's possible that > > > some relatively minor changes could allow you to use it. So let's try > > > not to be closing any doors right now > > > > > >> Thanks, > > >> David > > >> > > >> On 4/29/20, Antoine Pitrou <[email protected]> wrote: > > >> > > > >> > Le 29/04/2020 à 23:30, David Li a écrit : > > >> >> Sure - > > >> >> > > >> >> The use case is to read a large partitioned dataset, consisting of > > >> >> tens or hundreds of Parquet files. A reader expects to scan through > > >> >> the data in order of the partition key. However, to improve > > >> >> performance, we'd like to begin loading files N+1, N+2, ... N + k > > >> >> while the consumer is still reading file N, so that it doesn't have to > > >> >> wait every time it opens a new file, and to help hide any latency or > > >> >> slowness that might be happening on the backend. We also don't want to > > >> >> be in a situation where file N+2 is ready but file N+1 isn't, because > > >> >> that doesn't help us (we still have to wait for N+1 to load). > > >> > > > >> > But depending on network conditions, you may very well get file N+2 > > >> > before N+1, even if you start loading it after... > > >> > > > >> >> This is why I mention the project is quite similar to the Datasets > > >> >> project - Datasets likely covers all the functionality we would > > >> >> eventually need. > > >> > > > >> > Except that datasets are essentially unordered. > > >> > > > >> > Regards > > >> > > > >> > Antoine. > > >> > > > >
