You can use S3FileIO with any catalog implementation including
HadoopCatalog and HiveCatalog by setting the io-impl catalog property.
Detail is described in
https://iceberg.apache.org/custom-catalog/#custom-file-io-implementation

It would be very interesting to see how it performs versus HadoopFileIO, I
am looking forward to the results!

-Jack

On Tue, Mar 23, 2021 at 6:05 PM Mayur Srivastava <
[email protected]> wrote:

> Dan, thanks for getting back to me!
>
>
>
> I’ve not experimented with S3FileIO, and you are right, I’m using
> HadoopFileIO through HadoopTables. I’ve seen some example usage of S3FileIO
> is the glue catalog implementation. Are there other catalogs that support
> S3FileIO?
>
>
>
> The in-memory implementation is just a naïve implementation that aims to
> get the best case performance numbers for reading the entire tables as is
> without any column projection.
>
>
>
> It is great to know about the Vectorized-IO implementation. It sounds a
> bit similar to an in-house S3+parquet dataset optimization we have. We
> published a document on these optimizations to contribute to the Apache
> Arrow community, described here:
> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
> (I referred to this in question#5). Will it be possible to collaborate in
> future to open source these optimizations? This will quite useful for us
> and hopefully to the community.
>
>
>
> Meanwhile, I’ll checkout S3FileIO and try to change my benchmarks to use
> it.
>
>
>
> Thanks,
>
> Mayur
>
>
>
>
>
> *From:* Daniel Weeks <[email protected]>
> *Sent:* Tuesday, March 23, 2021 7:17 PM
> *To:* Iceberg Dev List <[email protected]>
> *Subject:* Re: Single Reader Benchmarks on S3-like Storage
>
>
>
> Hey Mayur, thanks for the detailed writeup.
>
>
>
> I would say that what you're looking at in terms of performance is very
> specific to the file system implementation (like you've already discovered
> by replacing the GHFS implementation).
>
>
>
> Within iceberg, this is scoped very specifically to the FileIO
> implementation.  By default you will likely use the HadoopFileIO (like the
> Hadoop locale file system implementation, S3A, or the GHFS one you probably
> used by default).
>
>
>
> With S3, there's a specific S3FileIO implementation that would likely
> perform better than S3A due to requiring fewer requests, but tuning even in
> that case is somewhat configurable via the S3 client supplied to the
> S3FileIO.
>
>
>
> There currently isn't a specific implementation for GCS, but S3FileIO
> would be a good reference for how to build a more tailored implementation.
>
>
>
> Your example in memory implementation is fast, but also introduces a lot
> of overhead if you're just reading parts of the file (like a single row
> group or just certain projected fields).  I think one approach that has
> been discussed but not implemented would be to create a Vectored-IO
> implementation that would be able to load just the offsets required for the
> projection into memory (this can even be done with parallelisation requests
> to further improve the load times).  At Netflix, we have an internal
> version of Vectored-IO implementation but it requires changes in both
> Parquet and file system implementation that are a little complicated.  The
> benefit is that the file system implementation can act as an IO manager and
> determine how many requests it wants to make (e.g. read through small,
> non-projected columns or make a separate request).
>
>
>
> You might want to consider experimenting with larger buffer sizes in the
> file system or other tuning options that might be available before pulling
> the entire file into memory (though that does provide an interesting
> comparison point).
>
>
>
> Just my thoughts on this.  Let me know if any of that is unclear,
>
> -Dan
>
>
>
>
>
>
>
> On Tue, Mar 23, 2021 at 1:44 PM Mayur Srivastava <
> [email protected]> wrote:
>
> Hi,
>
>
>
> I’ve been running performance benchmarks on core Iceberg readers on Google
> Cloud Storage (GCS). I would like to share some of my results and check
> whether there are ways to improve performance on S3-like storage in
> general. The details (including sample code) are listed below the question
> section.
>
>
>
> I’ve a few questions related to running Iceberg readers on S3-like storage:
>
> 1.      Are there published benchmarks for Iceberg on S3-like storage
> (GCS, Amazon S3, etc.)?
>
> 2.      Are there some configurations or code changes that improve
> performance (in terms of single reader latency and throughput) on S3-like
> storage?
>
> 3.      One of my first observations is that the Iceberg’s single-process
> reader throughput on GCS is considerably low when compared to the local
> disk version. My initial guess is that the Iceberg read patterns are
> generating several requests to GCS; note that each request on GCS is
> considerably slower than that on the local disk. Has anyone experienced and
> looked into the reader slowness on S3-like storage before and could share
> some insights?
>
> 4.      In the benchmarks, I created a naive buffering strategy on top of
> GCS where I buffer the whole file from GCS before running the
> deserialization (hence reducing S3 calls to 1). Using this file system
> change, the performance improves again and is similar to the local disk
> version. Are there some existing buffering implementations for the
> GCS/S3-like file system that the Iceberg community has been using?
>
> 5.      In the past, we have implemented some optimizations for reading
> Apache/Arrow datasets from S3-like storage and contributed parts of it to
> the Apache/Arrow C++ project. The details are discussed here
> https://lists.apache.org/thread.html/r9c8d1081e02f479a09015c270364b6fb9e9fa7d9c0d4982f6afab531%40%3Cdev.arrow.apache.org%3E.
> Has there been any similar effort on the Iceberg project?
>
>
>
> *Benchmark data set*
>
> The benchmark data is a monthly partitioned time series table containing a
> timestamp, key, and n double columns, where n=50, 100, 200, 300. Each
> monthly partition contains 1 parquet file that has uniformly distributed
> 180,000 rows. The monthly parquet files are uncompressed and have following
> sizes (per parquet file or per partition) for n=50, 100, 200, 300
> respectively: 69.3 MiB (per parquet file), 138.02 MiB, 275.46 MiB, 412.91
> MiB. The data is written for 1 year, i.e. 12 partitions. The values in the
> double columns are randomly distributed.
>
>
>
> *Benchmark experiments*
>
> We benchmarked a row-by-row reader and an Arrow reader that runs several
> kinds of queries on the Iceberg tables on local disk and GCS: read all,
> read 1 year, read 1 month, etc. Both readers use Iceberg core libraries and
> don’t depend on any other framework such as Spark. One of the queries of
> interest is reading one month of data which effectively reads a single
> parquet file from a monthly partition. The Arrow reader is introduced in
> https://github.com/apache/iceberg/pull/2286 (still under review). In both
> readers, there is a warm up phase and then each benchmark query is repeated
> 3 times. We compute the total bytes read by computing the size of Arrow
> vectors; this size is also used as a proxy for row-by-row reads. The total
> bytes read is close to the parquet file size on disk because the experiment
> was performed with no compression. The single reader throughput is computed
> using the total bytes read and the mean time taken to run the query.
>
>
>
> The benchmark was run on a 16-cpu/58G google cloud machine.
>
>
>
> *Benchmark reader code*
>
>
>
> *Row-by-row reader code:*
>
>
>
> Expression where = Expressions.and(
>
>     Expressions.greaterThanOrEqual("time", timestampToMicros(begin)),
>
>     Expressions.lessThan("time", timestampToMicros(endExclusive)));
>
>
>
> ScanBuilder builder = IcebergGenerics.read(table).where(where);
>
>
>
> try (CloseableIterable<Record> rowReader = builder.build()) {
>
>     for (Record row : rowReader) {
>
>     }
>
> }
>
>
>
> *Arrow reader code* (using https://github.com/apache/iceberg/pull/2286):
>
>
>
> Expression where = Expressions.and(
>
>     Expressions.greaterThanOrEqual("time", timestampToMicros(begin)),
>
>     Expressions.lessThan("time", timestampToMicros(endExclusive)));
>
>
>
> TableScan tableScan = table.newScan().filter(where);
>
>
>
> try (var iterable = new VectorizedTableScanIterable(tableScan)) {
>
>     for (ArrowBatch batch : iterable) {
>
>         VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();
>
>     }
>
> }
>
>
>
> *Selected results for query = read 1 month:*
>
>
>
> *Query:* read 1 month of data, i.e. read 1 parquet file from a single
> partition.
>
>
>
> *Result using Local Disk as storage:*
>
>
>
> In the first round, I ran the benchmark on the data stored on a local
> disk. The performance of row-by-row reader is ~100 MiB/s and Arrow reader
> is >300 MiB/s. The Arrow reader is significantly faster than the row-by-row
> reader.
>
>
>
> *Row-by-row reader:*
>
> n=50 read throughput=96.983 MiB/s
>
> n=100 read throughput=98.709 MiB/s
>
> n=200 read throughput=105.242 MiB/s
>
> n=300 read throughput=85.681 MiB/s
>
>
>
> *Arrow reader:*
>
> n=50 read throughput=432.107 MiB/s
>
> n=100 read throughput=396.876 MiB/s
>
> n=200 read throughput=327.403 MiB/s
>
> n=300 read throughput=325.542 MiB/s
>
>
>
> *Result using GCS as storage and GoogleHadoopFileSystem in hadoop
> configuration:*
>
>
>
> In the second round, I ran the benchmark on the data stored on GCS. The
> performance of row-by-row reader falls to 25 to 40 MiB/s and the Arrow
> reader falls to 35 to 60 MiB/s. This is a big drop in single reader
> throughput. It seems that the read patterns are generating several seeks
> and read requests on GCS.
>
>
>
> *Config code:*
>
> conf.set("fs.gs.impl", GoogleHadoopFileSystem.class.getName())
>
>
>
> *Row-by-row reader:*
>
> n=50 read throughput=37.515 MiB/s
>
> n=100 read throughput=38.745 MiB/s
>
> n=200 read throughput=32.282 MiB/s
>
> n=300 read throughput=25.943 MiB/s
>
>
>
> *Arrow reader:*
>
> n=50 read throughput=55.272 MiB/s
>
> n=100 read throughput=59.164 MiB/s
>
> n=200 read throughput=51.068 MiB/s
>
> n=300 read throughput=34.020 MiB/s
>
>
>
> *Result using GCS as storage and a naive buffering on top of
> GoogleHadoopFileSystem in hadoop configuration:*
>
>
>
> *Config code* (experimental/not committed):
>
> conf.set("fs.gs.impl", InMemoryLoadingFileSystem.class.getName())
>
>
>
> I implemented a naive buffering strategy on top of GoogleHadoopFileSystem
> which buffers the entire file (for these experiments) before deserializing
> the data. This change improves the throughput to almost the same level as
> the local disk.
>
>
>
> *Row-by-row reader:*
>
> n=50 read throughput=92.688 MiB/s
>
> n=100 read throughput=100.278 MiB/s
>
> n=200 read throughput=105.137 MiB/s
>
> n=300 read throughput=106.432 MiB/s
>
>
>
> *Arrow reader:*
>
> n=50 read throughput=282.355 MiB/s
>
> n=100 read throughput=264.336 MiB/s
>
> n=200 read throughput=320.418 MiB/s
>
> n=300 read throughput=338.855 MiB/s
>
>
>
> Thanks,
>
> Mayur
>
>
>
>

Reply via email to