On Wed, Feb 5, 2020 at 3:37 PM David Li <li.david...@gmail.com> wrote:
>
> Hi Antoine and Wes,
>
> Thanks for the feedback. Yes, we should definitely consider these as
> separate features.
>
> I agree that it makes sense for the file API (or a derived API) to
> expose a generic CacheRanges or PrebufferRanges API. It could then do
> coalescing and prefetching as desired based on the actual underlying
> data store. (Or, perhaps, the actual prefetch/coalesce steps could be
> implemented as wrappers also implementing the file API, with
> parameters to tune for concurrency level/resource consumption/backend
> storage characteristics; underlying file implementations would just
> treat this as a no-op. This is what we prototyped internally in C++
> before writing this up - a wrapper around S3File that naively
> prefetched the entire file.)
>
> As a separate step, prefetching/caching should also make use of a
> global (or otherwise shared) IO thread pool, so that parallel reads of
> different files implicitly coordinate work with each other as well.
> Then, you could queue up reads of several Parquet files, such that a
> slow network call for one file doesn't block progress for other files,
> without issuing reads for all of these files at once.
>
> It's unclear to me what readahead at the record batch level would
> accomplish - Parquet reads each column chunk in a row group as a
> whole, and if the row groups are large, then multiple record batches
> would fall in the same row group, so then we wouldn't gain any
> parallelism, no? (Admittedly, I'm not familiar with the internals
> here.)
>
> The concurrency manager would apply mostly to networked filesystems,
> yes. It's more an observation that some workloads issue fewer reads of
> large ranges, and other workloads issue lots of reads on small ranges,
> so concurrency should be limited based on estimated bandwidth, and not
> purely on number of concurrent tasks.
>
> In this case, it sounds like there are at least the following concrete tasks:
> - Adding a (no-op) CacheRanges method to the RandomAccessFile API.

I'll comment in more detail on some of the other items in due course,
but I think this should be handled by an implementation of
RandomAccessFile (that wraps a naked RandomAccessFile) with some
additional methods, rather than adding this to the abstract
RandomAccessFile interface, e.g.

class CachingInputFile : public RandomAccessFile {
 public:
   CachingInputFile(std::shared_ptr<RandomAccessFile> naked_file);
   Status CacheRanges(...);
};

etc.

> - Providing a column range hint to Parquet, and compute column chunk
> ranges to buffer via CacheRanges,
> - Providing an implementation of CacheRanges (e.g. via a wrapper
> RandomAccessFile) that coalesces ranges based on backend storage
> characteristics,
> - Providing an implementation of CacheRanges that actually prefetches
> ranges in the background,
> - Providing a shared I/O pool to coordinate such I/O across multiple files,
> - Providing some mechanism to limit concurrency in this shared pool,
> - Sending down all these hints from the Datasets implementation.
>
> Thanks,
> David
>
>
> On 2/5/20, Wes McKinney <wesmck...@gmail.com> wrote:
> > I agree with separating the problem into its constituent concerns to
> > make sure that we are developing appropriate abstractions.
> >
> > Speaking specifically about the Parquet codebase, the way that we
> > access a particular ColumnChunk in a row group is fairly simplistic.
> > See the ReaderProperties::GetStream method
> >
> > https://github.com/apache/arrow/blob/master/cpp/src/parquet/properties.cc#L28
> >
> > Rather than naively issuing a Read command (either buffered or
> > unbuffered, both bad for filesystems like S3), I think we need to
> > insert an abstraction at the point where the column reader class
> > requests an InputStream for its serialized column chunk data, which is
> > right here
> >
> > https://github.com/apache/arrow/blob/master/cpp/src/parquet/file_reader.cc#L123
> >
> > It seems like a stateful object that allows certain byte ranges of the
> > file to be cached would do the trick.
> >
> > Next, an API needs to be provided for applications to indicate which
> > column chunks they intend to read so that the contiguous byte ranges
> > can be pre-buffered, preventing any column reader from issuing naked
> > IO calls. Seems like this should happen at the ReaderProperties level.
> >
> > The IO scheduling seems like it should be abstracted away from the
> > Parquet library itself. So there would be code similar to
> >
> > std::pair<int64_t, int64_t> read_ranges = ComputePrebufferedRanges();
> > RETURN_NOT_OK(caching_file->CacheRanges(read_ranges));
> >
> > The IO scheduling can happen inside the implementation of CacheRanges.
> > Then when the column reader is created it will grab a slice of the
> > cached data rather than issuing IO calls.
> >
> > Let me know if this analysis makes sense
> >
> > - Wes
> >
> > On Wed, Feb 5, 2020 at 9:24 AM Antoine Pitrou <anto...@python.org> wrote:
> >>
> >>
> >> Hi David,
> >>
> >> I think we should discuss this as individual features.
> >>
> >> > Read Coalescing: from Parquet metadata, we know exactly> which byte
> >> > ranges of a file will be read, and can “cheatin the S3 IO
> >> layer by fetching them in advance
> >>
> >> It seems there are two things here: coalescing individual reads, and
> >> issuing them in advance.  It seems those are separate concerns.
> >>
> >> - coalescing reads: should the IO layer expose a ReadRanges function to
> >> issue several reads at once, which the Parquet layer can then exploit?
> >>
> >> - issuing reads in advance: isn't that solved by readahead *at the
> >> record batch level* (not the IO block level)?
> >>
> >> > Concurrency Manager: rather than limit parallelism by number of
> >> > outstanding tasks, we can instead limit the estimated bandwidth
> >> > consumption, allowing better performance when read sizes are small.
> >>
> >> - Concurrency Manager: is this a per-source optimization, applying
> >> mainly to networked filesystems?
> >>
> >>
> >> I think we want to make sure that each feature brings progress, instead
> >> of trying to lump everything at once in a big PR.
> >>
> >> Regards
> >>
> >> Antoine.
> >>
> >>
> >>
> >> Le 05/02/2020 à 14:32, David Li a écrit :
> >> > Hello all,
> >> >
> >> > We've been following the Arrow Datasets project with great interest,
> >> > especially as we have an in-house library with similar goals built on
> >> > top of PyArrow. Recently, we noticed some discussion around optimizing
> >> > I/O for such use cases (e.g. PARQUET-1698), which is also where we had
> >> > focused our efforts.
> >> >
> >> > Our long-term goal has been to open-source our library. However, our
> >> > code is in Python, but it would be most useful to everyone in the C++
> >> > core, so that R, Python, Ruby, etc. could benefit. Thus, we'd like to
> >> > share our high-level design, and offer to work with the community on
> >> > the implementation - at the very least, to avoid duplicating work.
> >> > We've summarized our approach, and hope this can start a discussion on
> >> > how to integrate such optimizations into Datasets:
> >> > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit#
> >> >
> >> > At a high level, we have three main optimizations:
> >> > - Given a set of columns to read, and potentially a filter on a
> >> > partition key, we can use Parquet metadata to compute exact byte
> >> > ranges to read from remote storage, and coalesce/split up reads as
> >> > necessary based on the characteristics of the storage platform.
> >> > - Given byte ranges to read, we can read them in parallel, using a
> >> > global thread pool and concurrency manager to limit parallelism and
> >> > resource consumption.
> >> > - By working at the level of a dataset, we can parallelize these
> >> > operations across files, and pipeline steps like reading Parquet
> >> > metadata with reading and deserialization.
> >> >
> >> > We focus on Parquet and S3/object storage here, but these concepts
> >> > apply to other file formats and storage systems.
> >> >
> >> > The main questions here are whether we think the optimizations are
> >> > useful for Arrow Datasets, and if so, how the API design and
> >> > implementation would proceed - I'd appreciate any feedback on the
> >> > approach here and potential API.
> >> >
> >> > David
> >> >
> >

Reply via email to