Hi all,

I’d like to follow up on this discussion. Thanks to Antoine, we now
have a read coalescing implementation in-tree which shows clear
performance benefits both when reading plain files and Parquet
files[1]. We now have some follow-up work where we think the design
and implementation could be interesting to the Datasets project. Some
context follows.

We’re using coalescing while loading multiple files in parallel, and
have found that we don’t get the expected speedup. We took the
straightforward approach: buffer up to N additional files in the
background using multiple threads, and read from each file in
sequence, adding more files to the tail as the files at the head are
consumed. This way, we begin loading data for future files while
deserializing/processing the current file.

However, we noticed this doesn’t actually bring us the expected
benefits. Consider files A, B, and C being buffered in parallel; right
now, all I/O goes through an internal I/O pool, and so several
operations for each of the three files get added to the pool. However,
they get serviced in some random order, and so it’s possible for file
C to finish all its I/O operations before file B can. Then, a consumer
is unnecessarily stuck waiting for those to complete.

As a demonstration, see the third chart in this visualization:
https://www.lidavidm.me/arrow/coalescing/vis.html For context: this is
a plot of trace spans, where x-axis is time, y-axis is thread ID, and
color-coding indicates which file the span is associated with. As you
can see, there is a large gap because file_1.parquet gets scheduled
after file_2 and file_3.parquet, and so the consumer thread (row 3
from the bottom) is stuck waiting for that the whole time.

Our proposed approach is to give the application enough control that
it can coordinate I/O across separate files. Returning to the example
above, if the application could discover and execute the I/O
operations for a Parquet file (or FeatherV2, etc.) separately, the
application could hold off on executing I/O for file C until all I/O
for file B has at least started. For example, in the current
framework, if we could tell how many I/O calls would happen when
reading from a Parquet file, we could insert a countdown latch in
between the Parquet file and the RandomAccessFile, which enqueues
reads and prevents them from progressing until all reads for the
previous file have made it through.

This has additional benefits: the application can lower concurrency to
limit memory usage. Or conversely, if it notices many small reads are
being issued, it can raise concurrency to keep bandwidth utilization
high while being confident that it isn’t using additional memory. By
controlling the ordering, the data consumer also gets more consistent
(less variation) read performance.

Concretely, for the PR adding coalescing to Parquet[1], I’d want to
rework it so that the Parquet reader can be asked for byte ranges
given a set of row groups and column indices. Then, I’d like to rework
the reader APIs to optionally accept a pre-populated ReadCache, so
that the application can populate the ReadCache as it wants. Any
further control over I/O would be done by the application, by wrapping
RandomAccessFile as appropriate.

We realize Datasets could do a lot of this, especially as it already
works with multiple files, Amazon S3, and has a JNI binding in
progress (Java being the main impetus of our current project).
However, we’d feel more comfortable building on Datasets once the API
is more stable. In the meantime, though, we would be interested in
pursuing this as an improvement to Datasets. I’m still not familiar
enough with the project and its roadmap to know how this would fit in,
however - but does this sound like it would want to be addressed by
the project eventually?

Best,
David

[1]: https://github.com/apache/arrow/pull/6744#issuecomment-607431959
Note some caveats on the numbers there - the rates are only correct
relative to each other since the benchmark doesn’t measure the actual
size of the deserialized data.


On 3/23/20, David Li <li.david...@gmail.com> wrote:
> Thanks. I've set up an AWS account for my own testing for now. I've
> also submitted a PR to add a basic benchmark which can be run
> self-contained, against a local Minio instance, or against S3:
> https://github.com/apache/arrow/pull/6675
>
> I ran the benchmark from my local machine, and I can test from EC2
> sometime as well. Performance is not ideal, but I'm being limited by
> my home internet connection - coalescing small chunked reads is (as
> expected) as fast as reading the file in one go, and in the PR
> (testing against localhost where we're not limited by bandwidth), it's
> faster than either option.
>
> ----------------------------------------------------------------------------------
> Benchmark                                           Time           CPU
> Iterations
> ----------------------------------------------------------------------------------
> MinioFixture/ReadAll1Mib/real_time          223416933 ns    9098743 ns
>        413   4.47594MB/s    4.47594 items/s
> MinioFixture/ReadAll100Mib/real_time       6068938152 ns  553319299 ns
>         10   16.4773MB/s   0.164773 items/s
> MinioFixture/ReadAll500Mib/real_time       30735046155 ns 2620718364
> ns          2   16.2681MB/s  0.0325361 items/s
> MinioFixture/ReadChunked100Mib/real_time   9625661666 ns  448637141 ns
>         12   10.3889MB/s   0.103889 items/s
> MinioFixture/ReadChunked500Mib/real_time   58736796101 ns 2070237834
> ns          2   8.51255MB/s  0.0170251 items/s
> MinioFixture/ReadCoalesced100Mib/real_time 6982902546 ns   22553824 ns
>         10   14.3207MB/s   0.143207 items/s
> MinioFixture/ReadCoalesced500Mib/real_time 29923239648 ns  112736805
> ns          3   16.7094MB/s  0.0334188 items/s
> MinioFixture/ReadParquet250K/real_time     21934689795 ns 2052758161
> ns          3   9.90955MB/s  0.0455899 items/s
>
> Best,
> David
>
>
> On 3/22/20, Wes McKinney <wesmck...@gmail.com> wrote:
>> On Thu, Mar 19, 2020 at 10:04 AM David Li <li.david...@gmail.com> wrote:
>>>
>>> > That's why it's important that we set ourselves up to do performance
>>> > testing in a realistic environment in AWS rather than simulating it.
>>>
>>> For my clarification, what are the plans for this (if any)? I couldn't
>>> find any prior discussion, though it sounds like the discussion around
>>> cloud CI capacity would be one step towards this.
>>>
>>> In the short term we could make tests/benchmarks configurable to not
>>> point at a Minio instance so individual developers can at least try
>>> things.
>>
>> It probably makes sense to begin investing in somewhat portable
>> tooling to assist with running S3-related unit tests and benchmarks
>> inside AWS. This could include initial Parquet dataset generation and
>> other things.
>>
>> As far as testing, I'm happy to pay for some AWS costs (within
>> reason). AWS might be able to donate some credits to us also
>>
>>> Best,
>>> David
>>>
>>> On 3/18/20, David Li <li.david...@gmail.com> wrote:
>>> > For us it applies to S3-like systems, not only S3 itself, at least.
>>> >
>>> > It does make sense to limit it to some filesystems. The behavior would
>>> > be opt-in at the Parquet reader level, so at the Datasets or
>>> > Filesystem layer we can take care of enabling the flag for filesystems
>>> > where it actually helps.
>>> >
>>> > I've filed these issues:
>>> > - ARROW-8151 to benchmark S3File+Parquet
>>> > (https://issues.apache.org/jira/browse/ARROW-8151)
>>> > - ARROW-8152 to split large reads
>>> > (https://issues.apache.org/jira/browse/ARROW-8152)
>>> > - PARQUET-1820 to use a column filter hint with coalescing
>>> > (https://issues.apache.org/jira/browse/PARQUET-1820)
>>> >
>>> > in addition to PARQUET-1698 which is just about pre-buffering the
>>> > entire row group (which we can now do with ARROW-7995).
>>> >
>>> > Best,
>>> > David
>>> >
>>> > On 3/18/20, Antoine Pitrou <anto...@python.org> wrote:
>>> >>
>>> >> Le 18/03/2020 à 18:30, David Li a écrit :
>>> >>>> Instead of S3, you can use the Slow streams and Slow filesystem
>>> >>>> implementations.  It may better protect against varying external
>>> >>>> conditions.
>>> >>>
>>> >>> I think we'd want several different benchmarks - we want to ensure
>>> >>> we
>>> >>> don't regress local filesystem performance, and we also want to
>>> >>> measure in an actual S3 environment. It would also be good to
>>> >>> measure
>>> >>> S3-compatible systems like Google's.
>>> >>>
>>> >>>>> - Use the coalescing inside the Parquet reader (even without a
>>> >>>>> column
>>> >>>>> filter hint - this would subsume PARQUET-1698)
>>> >>>>
>>> >>>> I'm assuming this would be done at the RowGroupReader level, right?
>>> >>>
>>> >>> Ideally we'd be able to coalesce across row groups as well, though
>>> >>> maybe it'd be easier to start with within-row-group-only (I need to
>>> >>> familiarize myself with the reader more).
>>> >>>
>>> >>>> I don't understand what the "advantage" would be.  Can you
>>> >>>> elaborate?
>>> >>>
>>> >>> As Wes said, empirically you can get more bandwidth out of S3 with
>>> >>> multiple concurrent HTTP requests. There is a cost to doing so
>>> >>> (establishing a new connection takes time), hence why the coalescing
>>> >>> tries to group small reads (to fully utilize one connection) and
>>> >>> split
>>> >>> large reads (to be able to take advantage of multiple connections).
>>> >>
>>> >> If that's S3-specific (or even AWS-specific) it might better be done
>>> >> inside the S3 filesystem.  For other filesystems I don't think it
>>> >> makes
>>> >> sense to split reads.
>>> >>
>>> >> Regards
>>> >>
>>> >> Antoine.
>>> >>
>>> >
>>
>

Reply via email to