Hi all, I’d like to follow up on this discussion. Thanks to Antoine, we now have a read coalescing implementation in-tree which shows clear performance benefits both when reading plain files and Parquet files[1]. We now have some follow-up work where we think the design and implementation could be interesting to the Datasets project. Some context follows.
We’re using coalescing while loading multiple files in parallel, and have found that we don’t get the expected speedup. We took the straightforward approach: buffer up to N additional files in the background using multiple threads, and read from each file in sequence, adding more files to the tail as the files at the head are consumed. This way, we begin loading data for future files while deserializing/processing the current file. However, we noticed this doesn’t actually bring us the expected benefits. Consider files A, B, and C being buffered in parallel; right now, all I/O goes through an internal I/O pool, and so several operations for each of the three files get added to the pool. However, they get serviced in some random order, and so it’s possible for file C to finish all its I/O operations before file B can. Then, a consumer is unnecessarily stuck waiting for those to complete. As a demonstration, see the third chart in this visualization: https://www.lidavidm.me/arrow/coalescing/vis.html For context: this is a plot of trace spans, where x-axis is time, y-axis is thread ID, and color-coding indicates which file the span is associated with. As you can see, there is a large gap because file_1.parquet gets scheduled after file_2 and file_3.parquet, and so the consumer thread (row 3 from the bottom) is stuck waiting for that the whole time. Our proposed approach is to give the application enough control that it can coordinate I/O across separate files. Returning to the example above, if the application could discover and execute the I/O operations for a Parquet file (or FeatherV2, etc.) separately, the application could hold off on executing I/O for file C until all I/O for file B has at least started. For example, in the current framework, if we could tell how many I/O calls would happen when reading from a Parquet file, we could insert a countdown latch in between the Parquet file and the RandomAccessFile, which enqueues reads and prevents them from progressing until all reads for the previous file have made it through. This has additional benefits: the application can lower concurrency to limit memory usage. Or conversely, if it notices many small reads are being issued, it can raise concurrency to keep bandwidth utilization high while being confident that it isn’t using additional memory. By controlling the ordering, the data consumer also gets more consistent (less variation) read performance. Concretely, for the PR adding coalescing to Parquet[1], I’d want to rework it so that the Parquet reader can be asked for byte ranges given a set of row groups and column indices. Then, I’d like to rework the reader APIs to optionally accept a pre-populated ReadCache, so that the application can populate the ReadCache as it wants. Any further control over I/O would be done by the application, by wrapping RandomAccessFile as appropriate. We realize Datasets could do a lot of this, especially as it already works with multiple files, Amazon S3, and has a JNI binding in progress (Java being the main impetus of our current project). However, we’d feel more comfortable building on Datasets once the API is more stable. In the meantime, though, we would be interested in pursuing this as an improvement to Datasets. I’m still not familiar enough with the project and its roadmap to know how this would fit in, however - but does this sound like it would want to be addressed by the project eventually? Best, David [1]: https://github.com/apache/arrow/pull/6744#issuecomment-607431959 Note some caveats on the numbers there - the rates are only correct relative to each other since the benchmark doesn’t measure the actual size of the deserialized data. On 3/23/20, David Li <li.david...@gmail.com> wrote: > Thanks. I've set up an AWS account for my own testing for now. I've > also submitted a PR to add a basic benchmark which can be run > self-contained, against a local Minio instance, or against S3: > https://github.com/apache/arrow/pull/6675 > > I ran the benchmark from my local machine, and I can test from EC2 > sometime as well. Performance is not ideal, but I'm being limited by > my home internet connection - coalescing small chunked reads is (as > expected) as fast as reading the file in one go, and in the PR > (testing against localhost where we're not limited by bandwidth), it's > faster than either option. > > ---------------------------------------------------------------------------------- > Benchmark Time CPU > Iterations > ---------------------------------------------------------------------------------- > MinioFixture/ReadAll1Mib/real_time 223416933 ns 9098743 ns > 413 4.47594MB/s 4.47594 items/s > MinioFixture/ReadAll100Mib/real_time 6068938152 ns 553319299 ns > 10 16.4773MB/s 0.164773 items/s > MinioFixture/ReadAll500Mib/real_time 30735046155 ns 2620718364 > ns 2 16.2681MB/s 0.0325361 items/s > MinioFixture/ReadChunked100Mib/real_time 9625661666 ns 448637141 ns > 12 10.3889MB/s 0.103889 items/s > MinioFixture/ReadChunked500Mib/real_time 58736796101 ns 2070237834 > ns 2 8.51255MB/s 0.0170251 items/s > MinioFixture/ReadCoalesced100Mib/real_time 6982902546 ns 22553824 ns > 10 14.3207MB/s 0.143207 items/s > MinioFixture/ReadCoalesced500Mib/real_time 29923239648 ns 112736805 > ns 3 16.7094MB/s 0.0334188 items/s > MinioFixture/ReadParquet250K/real_time 21934689795 ns 2052758161 > ns 3 9.90955MB/s 0.0455899 items/s > > Best, > David > > > On 3/22/20, Wes McKinney <wesmck...@gmail.com> wrote: >> On Thu, Mar 19, 2020 at 10:04 AM David Li <li.david...@gmail.com> wrote: >>> >>> > That's why it's important that we set ourselves up to do performance >>> > testing in a realistic environment in AWS rather than simulating it. >>> >>> For my clarification, what are the plans for this (if any)? I couldn't >>> find any prior discussion, though it sounds like the discussion around >>> cloud CI capacity would be one step towards this. >>> >>> In the short term we could make tests/benchmarks configurable to not >>> point at a Minio instance so individual developers can at least try >>> things. >> >> It probably makes sense to begin investing in somewhat portable >> tooling to assist with running S3-related unit tests and benchmarks >> inside AWS. This could include initial Parquet dataset generation and >> other things. >> >> As far as testing, I'm happy to pay for some AWS costs (within >> reason). AWS might be able to donate some credits to us also >> >>> Best, >>> David >>> >>> On 3/18/20, David Li <li.david...@gmail.com> wrote: >>> > For us it applies to S3-like systems, not only S3 itself, at least. >>> > >>> > It does make sense to limit it to some filesystems. The behavior would >>> > be opt-in at the Parquet reader level, so at the Datasets or >>> > Filesystem layer we can take care of enabling the flag for filesystems >>> > where it actually helps. >>> > >>> > I've filed these issues: >>> > - ARROW-8151 to benchmark S3File+Parquet >>> > (https://issues.apache.org/jira/browse/ARROW-8151) >>> > - ARROW-8152 to split large reads >>> > (https://issues.apache.org/jira/browse/ARROW-8152) >>> > - PARQUET-1820 to use a column filter hint with coalescing >>> > (https://issues.apache.org/jira/browse/PARQUET-1820) >>> > >>> > in addition to PARQUET-1698 which is just about pre-buffering the >>> > entire row group (which we can now do with ARROW-7995). >>> > >>> > Best, >>> > David >>> > >>> > On 3/18/20, Antoine Pitrou <anto...@python.org> wrote: >>> >> >>> >> Le 18/03/2020 à 18:30, David Li a écrit : >>> >>>> Instead of S3, you can use the Slow streams and Slow filesystem >>> >>>> implementations. It may better protect against varying external >>> >>>> conditions. >>> >>> >>> >>> I think we'd want several different benchmarks - we want to ensure >>> >>> we >>> >>> don't regress local filesystem performance, and we also want to >>> >>> measure in an actual S3 environment. It would also be good to >>> >>> measure >>> >>> S3-compatible systems like Google's. >>> >>> >>> >>>>> - Use the coalescing inside the Parquet reader (even without a >>> >>>>> column >>> >>>>> filter hint - this would subsume PARQUET-1698) >>> >>>> >>> >>>> I'm assuming this would be done at the RowGroupReader level, right? >>> >>> >>> >>> Ideally we'd be able to coalesce across row groups as well, though >>> >>> maybe it'd be easier to start with within-row-group-only (I need to >>> >>> familiarize myself with the reader more). >>> >>> >>> >>>> I don't understand what the "advantage" would be. Can you >>> >>>> elaborate? >>> >>> >>> >>> As Wes said, empirically you can get more bandwidth out of S3 with >>> >>> multiple concurrent HTTP requests. There is a cost to doing so >>> >>> (establishing a new connection takes time), hence why the coalescing >>> >>> tries to group small reads (to fully utilize one connection) and >>> >>> split >>> >>> large reads (to be able to take advantage of multiple connections). >>> >> >>> >> If that's S3-specific (or even AWS-specific) it might better be done >>> >> inside the S3 filesystem. For other filesystems I don't think it >>> >> makes >>> >> sense to split reads. >>> >> >>> >> Regards >>> >> >>> >> Antoine. >>> >> >>> > >> >