Catching up on questions here...

> Typically you can solve this by having enough IO concurrency at once :-)
> I'm not sure having sophisticated global coordination (based on which
> algorithms) would bring anything.  Would you care to elaborate?

We aren't proposing *sophisticated* global coordination, rather, just
using a global pool with a global limit, so that a user doesn't
unintentionally start hundreds of requests in parallel, and so that
you can adjust the resource consumption/performance tradeoff.

Essentially, what our library does is maintain two pools (for I/O):
- One pool produces I/O requests, by going through the list of files,
fetching the Parquet footers, and queuing up I/O requests on the main
pool. (This uses a pool so we can fetch and parse metadata from
multiple Parquet files at once.)
- One pool serves I/O requests, by fetching chunks and placing them in
buffers inside the file object implementation.

The global concurrency manager additionally limits the second pool by
not servicing I/O requests for a file until all of the I/O requests
for previous files have at least started. (By just having lots of
concurrency, you might end up starving yourself by reading data you
don't want quite yet.)

Additionally, the global pool could still be a win for non-Parquet
files - an implementation can at least submit, say, an entire CSV file
as a "chunk" and have it read in the background.

> Actually, on a more high-level basis, is the goal to prefetch for
> sequential consumption of row groups?

At least for us, our query pattern is to sequentially consume row
groups from a large dataset, where we select a subset of columns and a
subset of the partition key range (usually time range). Prefetching
speeds this up substantially, or in general, pipelining discovery of
files, I/O, and deserialization.

> There are no situations where you would want to consume a scattered
> subset of row groups (e.g. predicate pushdown)?

With coalescing, this "automatically" gets optimized. If you happen to
need column chunks from separate row groups that are adjacent or close
on-disk, coalescing will still fetch them in a single IO call.

We found that having large row groups was more beneficial than small
row groups, since when you combine small row groups with column
selection, you end up with a lot of small non-adjacent column chunks -
which coalescing can't help with. The exact tradeoff depends on the
dataset and workload, of course.

> This seems like too much to try to build into RandomAccessFile. I would
> suggest a class that wraps a random access file and manages cached segments
> and their lifetimes through explicit APIs.

A wrapper class seems ideal, especially as the logic is agnostic to
the storage backend (except for some parameters which can either be
hand-tuned or estimated on the fly). It also keeps the scope of the
changes down.

> Where to put the "async multiple range request" API is a separate question,
> though. Probably makes sense to start writing some working code and sort it
> out there.

We haven't looked in this direction much. Our designs are based around
thread pools partly because we wanted to avoid modifying the Parquet
and Arrow internals, instead choosing to modify the I/O layer to "keep
Parquet fed" as quickly as possible.

Overall, I recall there's an issue open for async APIs in
Arrow...perhaps we want to move that to a separate discussion, or on
the contrary, explore some experimental APIs here to inform the
overall design.

Thanks,
David

On 2/6/20, Wes McKinney <wesmck...@gmail.com> wrote:
> On Thu, Feb 6, 2020 at 1:30 PM Antoine Pitrou <anto...@python.org> wrote:
>>
>>
>> Le 06/02/2020 à 20:20, Wes McKinney a écrit :
>> >> Actually, on a more high-level basis, is the goal to prefetch for
>> >> sequential consumption of row groups?
>> >>
>> >
>> > Essentially yes. One "easy" optimization is to prefetch the entire
>> > serialized row group. This is an evolution of that idea where we want
>> > to
>> > prefetch only the needed parts of a row group in a minimum number of IO
>> > calls (consider reading the first 10 columns from a file with 1000
>> > columns
>> > -- so we want to do one IO call instead of 10 like we do now).
>>
>> There are no situations where you would want to consume a scattered
>> subset of row groups (e.g. predicate pushdown)?
>
> There are. If it can be demonstrated that there are performance gains
> resulting from IO optimizations involving multiple row groups then I
> see no reason not to implement them.
>
>> Regards
>>
>> Antoine.
>

Reply via email to