Hi Ryan, I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the vectorized arrow reader:
This is my first Iceberg pull request - I'm not fully aware of the contributing conventions of this repo, so let me know if any changes are needed in the pr. I've refactored some code from the Spark vectorized reader and added an ArrowReader which is a vectorized reader in Iceberg core. About the ArrowReader: 1. I’ve put the ArrowReader in the iceberg-data module because it needed to access the Iceberg table scan. Let me know if the reader needs to be moved. 2. I had to make a dependency addition of ‘iceberg-arrow’ for the iceberg-data module. Specially for the ArrowReaderTest, I had to add the following. Let me know if there is a better way for doing this. compileOnly("org.apache.arrow:arrow-vector") { exclude group: 'io.netty', module: 'netty-buffer' exclude group: 'com.google.code.findbugs', module: 'jsr305' } 3. Most of the code in ArrowReader is taken from the spark vectorized reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do it yet. 4. ArrowReader returns an iterator of VectorSchemaRoot and the behavior is explained in the Javadoc. 5. Some small changes were needed in IcebergGenerics to expose the table scan object and VectorizedArrowReader to allocate different timestamp Arrow vectors based on with/without timezone. 6. All prepush gradle tests pass except one which is still running (and it seems very slow - TestFlinkIcebergSink). 7. I've not performed any performance tests with the implementation yet. I'm planning to do so this week. Following are some limitations/questions for this implementation: 1. The arrow vector type is coupled with the physical data type in the parquet file: When column data contains a constant value, the column is dictionary encoded and the returned Arrow type is int32 irrespective of the Iceberg data type. I think that the Arrow vector type should be consistent with the logical Iceberg data type (and not change due to the physical data type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() that is currently ignored. 2. Type promotion does not work: In the ArrowReaderTest, the data for column ‘int_promotion’ was written as int, and then type was promoted from int to long, but the Arrow reader still returns IntVector. I think that the Arrow vector type should be consistent with the promoted logical Iceberg data type. 3. Data type limitations: a. Types not tested: UUID, FixedType, DecimalType. In the ArrowReaderTest, the parquet write was failing for these data types (due to a null pointer exception in ParquetMetadataConverter.addRowGroup: columnMetadata.getStatistics() was null). Are there unit tests with these types that write to parquet? b. Types not supported: TimeType, ListType, MapType, StructType. What is the path to add Arrow support for these data types? Thanks, Mayur From: Mayur Srivastava <mayur.srivast...@twosigma.com> Sent: Friday, February 12, 2021 7:41 PM To: dev@iceberg.apache.org; rb...@netflix.com Subject: RE: Reading data from Iceberg table into Apache Arrow in Java Thank you Ryan. I’ll dig into the file scan plan and Spark codebase to learn about the internals of Iceberg vectorized read path. Then, I’ll try to implement the vectorized reader using core components only. I’ll be happy to work with you to contribute it back to the upstream. I’ll get back to you if I’ve any question or need any more pointers. Thanks, Mayur From: Ryan Blue <rb...@netflix.com.INVALID<mailto:rb...@netflix.com.INVALID>> Sent: Friday, February 12, 2021 2:26 PM To: Iceberg Dev List <dev@iceberg.apache.org<mailto:dev@iceberg.apache.org>> Subject: Re: Reading data from Iceberg table into Apache Arrow in Java Hi Mayur, We built the Arrow support with Spark as the first use case, so the best examples of how to use it are in Spark. The generic reader does two things: it plans a scan and sets up an iterator of file readers to produce generic records. What you want to do is the same thing, but set up the file readers to produce Arrow batches. You can do that by changing the `Parquet.read` call and passing the callback to create an Arrow batch reader rather than generic row reader. I don't think there is a public example of this, but maybe someone else knows about one. This isn't available in Iceberg yet, but if you want to add it we'd be happy to help you get it in. The Spark read path has a good example, but it also wraps the Arrow batches so Spark can read them. Also, keep in mind that the Arrow integration only supports flat schemas right now, not fully nested schemas. So you'd need to still fall back to the row-based path. (Side note, if you have code to convert generics to Arrow, that's really useful to post somewhere.) I hope that helps. It would be great to work with you to improve this in a couple of PRs! rb On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <mayur.srivast...@twosigma.com<mailto:mayur.srivast...@twosigma.com>> wrote: Hi, We have an existing time series data access service based on Arrow/Flight which uses Apache Arrow format data to perform writes and reads (using time range queries) from a bespoke table-backend based on a S3 compatible storage. We are trying to replace our bespoke table-backend with Iceberg tables. For integrating with Iceberg, we are using Iceberg core+data+parquet modules directly to write and read data. I would like to note that our service cannot use the Spark route to write or read the data. In our current Iceberg reader integration code, we are using IcebergGenerics.read(table).select(...).where(...).build() to iterate through the data row-by-row. Instead of this (potentially slower) read path which needs conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized read path which directly returns an Arrow VectorSchemaRoot as a callback or Arrow record batches as the result set. I have noticed that Iceberg already has an Arrow module https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow. I have also looked into https://github.com/apache/iceberg/issues/9 and https://github.com/apache/iceberg/milestone/2. But, I’m not sure about the current status of the vectorized reader support. I’m also not sure how this Arrow module is being used to perform a vectorized read to execute a query on an Iceberg table in the core/data/parquet library. I have a few questions regarding the Vectorized reader/Arrow support: 1. Is it possible to run a vectorized read on an Iceberg table to return data in Arrow format using a non-Spark reader in Java? 2. Is there an example of reading data in Arrow format from an Iceberg table? 3. Is the Spark read path completely vectorized? I ask this question to find out if we can borrow from the vectorized Spark reader or we can move code from vectorized Spark reader to the Iceberg core library. Let me know if you have any questions for me. Thanks, Mayur -- Ryan Blue Software Engineer Netflix