You can use S3FileIO with any catalog implementation including HadoopCatalog and HiveCatalog by setting the io-impl catalog property. Detail is described in https://iceberg.apache.org/custom-catalog/#custom-file-io-implementation
It would be very interesting to see how it performs versus HadoopFileIO, I am looking forward to the results! -Jack On Tue, Mar 23, 2021 at 6:05 PM Mayur Srivastava < [email protected]> wrote: > Dan, thanks for getting back to me! > > > > I’ve not experimented with S3FileIO, and you are right, I’m using > HadoopFileIO through HadoopTables. I’ve seen some example usage of S3FileIO > is the glue catalog implementation. Are there other catalogs that support > S3FileIO? > > > > The in-memory implementation is just a naïve implementation that aims to > get the best case performance numbers for reading the entire tables as is > without any column projection. > > > > It is great to know about the Vectorized-IO implementation. It sounds a > bit similar to an in-house S3+parquet dataset optimization we have. We > published a document on these optimizations to contribute to the Apache > Arrow community, described here: > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit > (I referred to this in question#5). Will it be possible to collaborate in > future to open source these optimizations? This will quite useful for us > and hopefully to the community. > > > > Meanwhile, I’ll checkout S3FileIO and try to change my benchmarks to use > it. > > > > Thanks, > > Mayur > > > > > > *From:* Daniel Weeks <[email protected]> > *Sent:* Tuesday, March 23, 2021 7:17 PM > *To:* Iceberg Dev List <[email protected]> > *Subject:* Re: Single Reader Benchmarks on S3-like Storage > > > > Hey Mayur, thanks for the detailed writeup. > > > > I would say that what you're looking at in terms of performance is very > specific to the file system implementation (like you've already discovered > by replacing the GHFS implementation). > > > > Within iceberg, this is scoped very specifically to the FileIO > implementation. By default you will likely use the HadoopFileIO (like the > Hadoop locale file system implementation, S3A, or the GHFS one you probably > used by default). > > > > With S3, there's a specific S3FileIO implementation that would likely > perform better than S3A due to requiring fewer requests, but tuning even in > that case is somewhat configurable via the S3 client supplied to the > S3FileIO. > > > > There currently isn't a specific implementation for GCS, but S3FileIO > would be a good reference for how to build a more tailored implementation. > > > > Your example in memory implementation is fast, but also introduces a lot > of overhead if you're just reading parts of the file (like a single row > group or just certain projected fields). I think one approach that has > been discussed but not implemented would be to create a Vectored-IO > implementation that would be able to load just the offsets required for the > projection into memory (this can even be done with parallelisation requests > to further improve the load times). At Netflix, we have an internal > version of Vectored-IO implementation but it requires changes in both > Parquet and file system implementation that are a little complicated. The > benefit is that the file system implementation can act as an IO manager and > determine how many requests it wants to make (e.g. read through small, > non-projected columns or make a separate request). > > > > You might want to consider experimenting with larger buffer sizes in the > file system or other tuning options that might be available before pulling > the entire file into memory (though that does provide an interesting > comparison point). > > > > Just my thoughts on this. Let me know if any of that is unclear, > > -Dan > > > > > > > > On Tue, Mar 23, 2021 at 1:44 PM Mayur Srivastava < > [email protected]> wrote: > > Hi, > > > > I’ve been running performance benchmarks on core Iceberg readers on Google > Cloud Storage (GCS). I would like to share some of my results and check > whether there are ways to improve performance on S3-like storage in > general. The details (including sample code) are listed below the question > section. > > > > I’ve a few questions related to running Iceberg readers on S3-like storage: > > 1. Are there published benchmarks for Iceberg on S3-like storage > (GCS, Amazon S3, etc.)? > > 2. Are there some configurations or code changes that improve > performance (in terms of single reader latency and throughput) on S3-like > storage? > > 3. One of my first observations is that the Iceberg’s single-process > reader throughput on GCS is considerably low when compared to the local > disk version. My initial guess is that the Iceberg read patterns are > generating several requests to GCS; note that each request on GCS is > considerably slower than that on the local disk. Has anyone experienced and > looked into the reader slowness on S3-like storage before and could share > some insights? > > 4. In the benchmarks, I created a naive buffering strategy on top of > GCS where I buffer the whole file from GCS before running the > deserialization (hence reducing S3 calls to 1). Using this file system > change, the performance improves again and is similar to the local disk > version. Are there some existing buffering implementations for the > GCS/S3-like file system that the Iceberg community has been using? > > 5. In the past, we have implemented some optimizations for reading > Apache/Arrow datasets from S3-like storage and contributed parts of it to > the Apache/Arrow C++ project. The details are discussed here > https://lists.apache.org/thread.html/r9c8d1081e02f479a09015c270364b6fb9e9fa7d9c0d4982f6afab531%40%3Cdev.arrow.apache.org%3E. > Has there been any similar effort on the Iceberg project? > > > > *Benchmark data set* > > The benchmark data is a monthly partitioned time series table containing a > timestamp, key, and n double columns, where n=50, 100, 200, 300. Each > monthly partition contains 1 parquet file that has uniformly distributed > 180,000 rows. The monthly parquet files are uncompressed and have following > sizes (per parquet file or per partition) for n=50, 100, 200, 300 > respectively: 69.3 MiB (per parquet file), 138.02 MiB, 275.46 MiB, 412.91 > MiB. The data is written for 1 year, i.e. 12 partitions. The values in the > double columns are randomly distributed. > > > > *Benchmark experiments* > > We benchmarked a row-by-row reader and an Arrow reader that runs several > kinds of queries on the Iceberg tables on local disk and GCS: read all, > read 1 year, read 1 month, etc. Both readers use Iceberg core libraries and > don’t depend on any other framework such as Spark. One of the queries of > interest is reading one month of data which effectively reads a single > parquet file from a monthly partition. The Arrow reader is introduced in > https://github.com/apache/iceberg/pull/2286 (still under review). In both > readers, there is a warm up phase and then each benchmark query is repeated > 3 times. We compute the total bytes read by computing the size of Arrow > vectors; this size is also used as a proxy for row-by-row reads. The total > bytes read is close to the parquet file size on disk because the experiment > was performed with no compression. The single reader throughput is computed > using the total bytes read and the mean time taken to run the query. > > > > The benchmark was run on a 16-cpu/58G google cloud machine. > > > > *Benchmark reader code* > > > > *Row-by-row reader code:* > > > > Expression where = Expressions.and( > > Expressions.greaterThanOrEqual("time", timestampToMicros(begin)), > > Expressions.lessThan("time", timestampToMicros(endExclusive))); > > > > ScanBuilder builder = IcebergGenerics.read(table).where(where); > > > > try (CloseableIterable<Record> rowReader = builder.build()) { > > for (Record row : rowReader) { > > } > > } > > > > *Arrow reader code* (using https://github.com/apache/iceberg/pull/2286): > > > > Expression where = Expressions.and( > > Expressions.greaterThanOrEqual("time", timestampToMicros(begin)), > > Expressions.lessThan("time", timestampToMicros(endExclusive))); > > > > TableScan tableScan = table.newScan().filter(where); > > > > try (var iterable = new VectorizedTableScanIterable(tableScan)) { > > for (ArrowBatch batch : iterable) { > > VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); > > } > > } > > > > *Selected results for query = read 1 month:* > > > > *Query:* read 1 month of data, i.e. read 1 parquet file from a single > partition. > > > > *Result using Local Disk as storage:* > > > > In the first round, I ran the benchmark on the data stored on a local > disk. The performance of row-by-row reader is ~100 MiB/s and Arrow reader > is >300 MiB/s. The Arrow reader is significantly faster than the row-by-row > reader. > > > > *Row-by-row reader:* > > n=50 read throughput=96.983 MiB/s > > n=100 read throughput=98.709 MiB/s > > n=200 read throughput=105.242 MiB/s > > n=300 read throughput=85.681 MiB/s > > > > *Arrow reader:* > > n=50 read throughput=432.107 MiB/s > > n=100 read throughput=396.876 MiB/s > > n=200 read throughput=327.403 MiB/s > > n=300 read throughput=325.542 MiB/s > > > > *Result using GCS as storage and GoogleHadoopFileSystem in hadoop > configuration:* > > > > In the second round, I ran the benchmark on the data stored on GCS. The > performance of row-by-row reader falls to 25 to 40 MiB/s and the Arrow > reader falls to 35 to 60 MiB/s. This is a big drop in single reader > throughput. It seems that the read patterns are generating several seeks > and read requests on GCS. > > > > *Config code:* > > conf.set("fs.gs.impl", GoogleHadoopFileSystem.class.getName()) > > > > *Row-by-row reader:* > > n=50 read throughput=37.515 MiB/s > > n=100 read throughput=38.745 MiB/s > > n=200 read throughput=32.282 MiB/s > > n=300 read throughput=25.943 MiB/s > > > > *Arrow reader:* > > n=50 read throughput=55.272 MiB/s > > n=100 read throughput=59.164 MiB/s > > n=200 read throughput=51.068 MiB/s > > n=300 read throughput=34.020 MiB/s > > > > *Result using GCS as storage and a naive buffering on top of > GoogleHadoopFileSystem in hadoop configuration:* > > > > *Config code* (experimental/not committed): > > conf.set("fs.gs.impl", InMemoryLoadingFileSystem.class.getName()) > > > > I implemented a naive buffering strategy on top of GoogleHadoopFileSystem > which buffers the entire file (for these experiments) before deserializing > the data. This change improves the throughput to almost the same level as > the local disk. > > > > *Row-by-row reader:* > > n=50 read throughput=92.688 MiB/s > > n=100 read throughput=100.278 MiB/s > > n=200 read throughput=105.137 MiB/s > > n=300 read throughput=106.432 MiB/s > > > > *Arrow reader:* > > n=50 read throughput=282.355 MiB/s > > n=100 read throughput=264.336 MiB/s > > n=200 read throughput=320.418 MiB/s > > n=300 read throughput=338.855 MiB/s > > > > Thanks, > > Mayur > > > >
