Hi Mayur, Playing around with the idea of implementing vectorized reads for Hive so your message come just in time :)
Took a quick look at the code but I do not really understand how vectorized reads handle deletes. In non-vectorized code-path I have found this which filters the rows one-by-one: https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277 <https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277> DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema); Schema requiredSchema = deletes.requiredSchema(); return deletes.filter(openTask(currentTask, requiredSchema)); In your code I have found that the delete files encryption keys are collected, but not sure how they are used: https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59 <https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59> task.files().stream() .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream())) .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata())); Could you please help me with some quick pointers? Thanks, Peter > On Mar 1, 2021, at 16:17, Mayur Srivastava <mayur.srivast...@twosigma.com> > wrote: > > Hi Ryan, > > I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286 > <https://github.com/apache/iceberg/pull/2286>) for the vectorized arrow > reader: > > This is my first Iceberg pull request - I'm not fully aware of the > contributing conventions of this repo, so let me know if any changes are > needed in the pr. > I've refactored some code from the Spark vectorized reader and added an > ArrowReader which is a vectorized reader in Iceberg core. > > About the ArrowReader: > 1. I’ve put the ArrowReader in the iceberg-data module because it needed > to access the Iceberg table scan. Let me know if the reader needs to be moved. > 2. I had to make a dependency addition of ‘iceberg-arrow’ for the > iceberg-data module. Specially for the ArrowReaderTest, I had to add the > following. Let me know if there is a better way for doing this. > compileOnly("org.apache.arrow:arrow-vector") { > exclude group: 'io.netty', module: 'netty-buffer' > exclude group: 'com.google.code.findbugs', module: 'jsr305' > } > 3. Most of the code in ArrowReader is taken from the spark vectorized > reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ > in both versions, but I did not attempt to do it yet. > 4. ArrowReader returns an iterator of VectorSchemaRoot and the behavior > is explained in the Javadoc. > 5. Some small changes were needed in IcebergGenerics to expose the table > scan object and VectorizedArrowReader to allocate different timestamp Arrow > vectors based on with/without timezone. > 6. All prepush gradle tests pass except one which is still running (and > it seems very slow - TestFlinkIcebergSink). > 7. I've not performed any performance tests with the implementation yet. > I'm planning to do so this week. > > Following are some limitations/questions for this implementation: > 1. The arrow vector type is coupled with the physical data type in the > parquet file: When column data contains a constant value, the column is > dictionary encoded and the returned Arrow type is int32 irrespective of the > Iceberg data type. I think that the Arrow vector type should be consistent > with the logical Iceberg data type (and not change due to the physical data > type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() that > is currently ignored. > 2. Type promotion does not work: In the ArrowReaderTest, the data for > column ‘int_promotion’ was written as int, and then type was promoted from > int to long, but the Arrow reader still returns IntVector. I think that the > Arrow vector type should be consistent with the promoted logical Iceberg data > type. > 3. Data type limitations: > a. Types not tested: UUID, FixedType, DecimalType. In the > ArrowReaderTest, the parquet write was failing for these data types (due to a > null pointer exception in ParquetMetadataConverter.addRowGroup: > columnMetadata.getStatistics() was null). Are there unit tests with these > types that write to parquet? > b. Types not supported: TimeType, ListType, MapType, StructType. What is > the path to add Arrow support for these data types? > > Thanks, > Mayur > > From: Mayur Srivastava <mayur.srivast...@twosigma.com> > Sent: Friday, February 12, 2021 7:41 PM > To: dev@iceberg.apache.org; rb...@netflix.com > Subject: RE: Reading data from Iceberg table into Apache Arrow in Java > > Thank you Ryan. > > I’ll dig into the file scan plan and Spark codebase to learn about the > internals of Iceberg vectorized read path. Then, I’ll try to implement the > vectorized reader using core components only. I’ll be happy to work with you > to contribute it back to the upstream. I’ll get back to you if I’ve any > question or need any more pointers. > > Thanks, > Mayur > > From: Ryan Blue <rb...@netflix.com.INVALID > <mailto:rb...@netflix.com.INVALID>> > Sent: Friday, February 12, 2021 2:26 PM > To: Iceberg Dev List <dev@iceberg.apache.org <mailto:dev@iceberg.apache.org>> > Subject: Re: Reading data from Iceberg table into Apache Arrow in Java > > Hi Mayur, > > We built the Arrow support with Spark as the first use case, so the best > examples of how to use it are in Spark. > > The generic reader does two things: it plans a scan and sets up an iterator > of file readers to produce generic records. What you want to do is the same > thing, but set up the file readers to produce Arrow batches. You can do that > by changing the `Parquet.read` call and passing the callback to create an > Arrow batch reader rather than generic row reader. I don't think there is a > public example of this, but maybe someone else knows about one. This isn't > available in Iceberg yet, but if you want to add it we'd be happy to help you > get it in. > > The Spark read path has a good example, but it also wraps the Arrow batches > so Spark can read them. Also, keep in mind that the Arrow integration only > supports flat schemas right now, not fully nested schemas. So you'd need to > still fall back to the row-based path. (Side note, if you have code to > convert generics to Arrow, that's really useful to post somewhere.) > > I hope that helps. It would be great to work with you to improve this in a > couple of PRs! > > rb > > On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava > <mayur.srivast...@twosigma.com <mailto:mayur.srivast...@twosigma.com>> wrote: > Hi, > > We have an existing time series data access service based on Arrow/Flight > which uses Apache Arrow format data to perform writes and reads (using time > range queries) from a bespoke table-backend based on a S3 compatible storage. > > We are trying to replace our bespoke table-backend with Iceberg tables. For > integrating with Iceberg, we are using Iceberg core+data+parquet modules > directly to write and read data. I would like to note that our service cannot > use the Spark route to write or read the data. In our current Iceberg reader > integration code, we are using > IcebergGenerics.read(table).select(...).where(...).build() to iterate through > the data row-by-row. Instead of this (potentially slower) read path which > needs conversion between rows and Arrow VectorSchemaRoot, we want to use a > vectorized read path which directly returns an Arrow VectorSchemaRoot as a > callback or Arrow record batches as the result set. > > I have noticed that Iceberg already has an Arrow > modulehttps://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow > > <https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow>. > I have also looked into https://github.com/apache/iceberg/issues/9 > <https://github.com/apache/iceberg/issues/9> and > https://github.com/apache/iceberg/milestone/2 > <https://github.com/apache/iceberg/milestone/2>. But, I’m not sure about the > current status of the vectorized reader support. I’m also not sure how this > Arrow module is being used to perform a vectorized read to execute a query on > an Iceberg table in the core/data/parquet library. > > I have a few questions regarding the Vectorized reader/Arrow support: > 1. Is it possible to run a vectorized read on an Iceberg table to return > data in Arrow format using a non-Spark reader in Java? > 2. Is there an example of reading data in Arrow format from an Iceberg > table? > 3. Is the Spark read path completely vectorized? I ask this question to > find out if we can borrow from the vectorized Spark reader or we can move > code from vectorized Spark reader to the Iceberg core library. > > Let me know if you have any questions for me. > > Thanks, > Mayur > > > > -- > Ryan Blue > Software Engineer > Netflix