Hey Mayur, thanks for the detailed writeup.

I would say that what you're looking at in terms of performance is very
specific to the file system implementation (like you've already discovered
by replacing the GHFS implementation).

Within iceberg, this is scoped very specifically to the FileIO
implementation.  By default you will likely use the HadoopFileIO (like the
Hadoop locale file system implementation, S3A, or the GHFS one you probably
used by default).

With S3, there's a specific S3FileIO implementation that would likely
perform better than S3A due to requiring fewer requests, but tuning even in
that case is somewhat configurable via the S3 client supplied to the
S3FileIO.

There currently isn't a specific implementation for GCS, but S3FileIO would
be a good reference for how to build a more tailored implementation.

Your example in memory implementation is fast, but also introduces a lot of
overhead if you're just reading parts of the file (like a single row group
or just certain projected fields).  I think one approach that has been
discussed but not implemented would be to create a Vectored-IO
implementation that would be able to load just the offsets required for the
projection into memory (this can even be done with parallelisation requests
to further improve the load times).  At Netflix, we have an internal
version of Vectored-IO implementation but it requires changes in both
Parquet and file system implementation that are a little complicated.  The
benefit is that the file system implementation can act as an IO manager and
determine how many requests it wants to make (e.g. read through small,
non-projected columns or make a separate request).

You might want to consider experimenting with larger buffer sizes in the
file system or other tuning options that might be available before pulling
the entire file into memory (though that does provide an interesting
comparison point).

Just my thoughts on this.  Let me know if any of that is unclear,
-Dan



On Tue, Mar 23, 2021 at 1:44 PM Mayur Srivastava <
[email protected]> wrote:

> Hi,
>
>
>
> I’ve been running performance benchmarks on core Iceberg readers on Google
> Cloud Storage (GCS). I would like to share some of my results and check
> whether there are ways to improve performance on S3-like storage in
> general. The details (including sample code) are listed below the question
> section.
>
>
>
> I’ve a few questions related to running Iceberg readers on S3-like storage:
>
> 1.      Are there published benchmarks for Iceberg on S3-like storage
> (GCS, Amazon S3, etc.)?
>
> 2.      Are there some configurations or code changes that improve
> performance (in terms of single reader latency and throughput) on S3-like
> storage?
>
> 3.      One of my first observations is that the Iceberg’s single-process
> reader throughput on GCS is considerably low when compared to the local
> disk version. My initial guess is that the Iceberg read patterns are
> generating several requests to GCS; note that each request on GCS is
> considerably slower than that on the local disk. Has anyone experienced and
> looked into the reader slowness on S3-like storage before and could share
> some insights?
>
> 4.      In the benchmarks, I created a naive buffering strategy on top of
> GCS where I buffer the whole file from GCS before running the
> deserialization (hence reducing S3 calls to 1). Using this file system
> change, the performance improves again and is similar to the local disk
> version. Are there some existing buffering implementations for the
> GCS/S3-like file system that the Iceberg community has been using?
>
> 5.      In the past, we have implemented some optimizations for reading
> Apache/Arrow datasets from S3-like storage and contributed parts of it to
> the Apache/Arrow C++ project. The details are discussed here
> https://lists.apache.org/thread.html/r9c8d1081e02f479a09015c270364b6fb9e9fa7d9c0d4982f6afab531%40%3Cdev.arrow.apache.org%3E.
> Has there been any similar effort on the Iceberg project?
>
>
>
> *Benchmark data set*
>
> The benchmark data is a monthly partitioned time series table containing a
> timestamp, key, and n double columns, where n=50, 100, 200, 300. Each
> monthly partition contains 1 parquet file that has uniformly distributed
> 180,000 rows. The monthly parquet files are uncompressed and have following
> sizes (per parquet file or per partition) for n=50, 100, 200, 300
> respectively: 69.3 MiB (per parquet file), 138.02 MiB, 275.46 MiB, 412.91
> MiB. The data is written for 1 year, i.e. 12 partitions. The values in the
> double columns are randomly distributed.
>
>
>
> *Benchmark experiments*
>
> We benchmarked a row-by-row reader and an Arrow reader that runs several
> kinds of queries on the Iceberg tables on local disk and GCS: read all,
> read 1 year, read 1 month, etc. Both readers use Iceberg core libraries and
> don’t depend on any other framework such as Spark. One of the queries of
> interest is reading one month of data which effectively reads a single
> parquet file from a monthly partition. The Arrow reader is introduced in
> https://github.com/apache/iceberg/pull/2286 (still under review). In both
> readers, there is a warm up phase and then each benchmark query is repeated
> 3 times. We compute the total bytes read by computing the size of Arrow
> vectors; this size is also used as a proxy for row-by-row reads. The total
> bytes read is close to the parquet file size on disk because the experiment
> was performed with no compression. The single reader throughput is computed
> using the total bytes read and the mean time taken to run the query.
>
>
>
> The benchmark was run on a 16-cpu/58G google cloud machine.
>
>
>
> *Benchmark reader code*
>
>
>
> *Row-by-row reader code:*
>
>
>
> Expression where = Expressions.and(
>
>     Expressions.greaterThanOrEqual("time", timestampToMicros(begin)),
>
>     Expressions.lessThan("time", timestampToMicros(endExclusive)));
>
>
>
> ScanBuilder builder = IcebergGenerics.read(table).where(where);
>
>
>
> try (CloseableIterable<Record> rowReader = builder.build()) {
>
>     for (Record row : rowReader) {
>
>     }
>
> }
>
>
>
> *Arrow reader code* (using https://github.com/apache/iceberg/pull/2286):
>
>
>
> Expression where = Expressions.and(
>
>     Expressions.greaterThanOrEqual("time", timestampToMicros(begin)),
>
>     Expressions.lessThan("time", timestampToMicros(endExclusive)));
>
>
>
> TableScan tableScan = table.newScan().filter(where);
>
>
>
> try (var iterable = new VectorizedTableScanIterable(tableScan)) {
>
>     for (ArrowBatch batch : iterable) {
>
>         VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();
>
>     }
>
> }
>
>
>
> *Selected results for query = read 1 month:*
>
>
>
> *Query:* read 1 month of data, i.e. read 1 parquet file from a single
> partition.
>
>
>
> *Result using Local Disk as storage:*
>
>
>
> In the first round, I ran the benchmark on the data stored on a local
> disk. The performance of row-by-row reader is ~100 MiB/s and Arrow reader
> is >300 MiB/s. The Arrow reader is significantly faster than the row-by-row
> reader.
>
>
>
> *Row-by-row reader:*
>
> n=50 read throughput=96.983 MiB/s
>
> n=100 read throughput=98.709 MiB/s
>
> n=200 read throughput=105.242 MiB/s
>
> n=300 read throughput=85.681 MiB/s
>
>
>
> *Arrow reader:*
>
> n=50 read throughput=432.107 MiB/s
>
> n=100 read throughput=396.876 MiB/s
>
> n=200 read throughput=327.403 MiB/s
>
> n=300 read throughput=325.542 MiB/s
>
>
>
> *Result using GCS as storage and GoogleHadoopFileSystem in hadoop
> configuration:*
>
>
>
> In the second round, I ran the benchmark on the data stored on GCS. The
> performance of row-by-row reader falls to 25 to 40 MiB/s and the Arrow
> reader falls to 35 to 60 MiB/s. This is a big drop in single reader
> throughput. It seems that the read patterns are generating several seeks
> and read requests on GCS.
>
>
>
> *Config code:*
>
> conf.set("fs.gs.impl", GoogleHadoopFileSystem.class.getName())
>
>
>
> *Row-by-row reader:*
>
> n=50 read throughput=37.515 MiB/s
>
> n=100 read throughput=38.745 MiB/s
>
> n=200 read throughput=32.282 MiB/s
>
> n=300 read throughput=25.943 MiB/s
>
>
>
> *Arrow reader:*
>
> n=50 read throughput=55.272 MiB/s
>
> n=100 read throughput=59.164 MiB/s
>
> n=200 read throughput=51.068 MiB/s
>
> n=300 read throughput=34.020 MiB/s
>
>
>
> *Result using GCS as storage and a naive buffering on top of
> GoogleHadoopFileSystem in hadoop configuration:*
>
>
>
> *Config code* (experimental/not committed):
>
> conf.set("fs.gs.impl", InMemoryLoadingFileSystem.class.getName())
>
>
>
> I implemented a naive buffering strategy on top of GoogleHadoopFileSystem
> which buffers the entire file (for these experiments) before deserializing
> the data. This change improves the throughput to almost the same level as
> the local disk.
>
>
>
> *Row-by-row reader:*
>
> n=50 read throughput=92.688 MiB/s
>
> n=100 read throughput=100.278 MiB/s
>
> n=200 read throughput=105.137 MiB/s
>
> n=300 read throughput=106.432 MiB/s
>
>
>
> *Arrow reader:*
>
> n=50 read throughput=282.355 MiB/s
>
> n=100 read throughput=264.336 MiB/s
>
> n=200 read throughput=320.418 MiB/s
>
> n=300 read throughput=338.855 MiB/s
>
>
>
> Thanks,
>
> Mayur
>
>
>

Reply via email to