Thanks for finding out Peter. Should we proceed with this pr and later add support for vectorized reads in a separate pr? There are also some other limitations in the current pr (listed in the pr) which could be addressed in subsequent prs.
Thanks, Mayur From: Peter Vary <pv...@cloudera.com.INVALID> Sent: Tuesday, March 2, 2021 10:38 AM To: Iceberg Dev List <dev@iceberg.apache.org> Cc: Ryan Blue <rb...@netflix.com> Subject: Re: Reading data from Iceberg table into Apache Arrow in Java After some more digging, I think there is no solution yet for vectorized reads with deletes. this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks || (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); Mayur Srivastava <mayur.srivast...@twosigma.com<mailto:mayur.srivast...@twosigma.com>> ezt írta (időpont: 2021. márc. 2., Ke 15:48): Hi Peter, Good point. Most of the ArrowReader code is inspired from the Spark’s vectorized reader (src: https://github.com/apache/iceberg/blob/631efec2f9ce1f0526d6613c81ac8fd0ccb95b5e/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java#L68). We’ll probably need some help from someone who understands the Spark vectorized read path. But, I’ll read the code to understand the deletes. Thanks, Mayur From: Peter Vary <pv...@cloudera.com.INVALID<mailto:pv...@cloudera.com.INVALID>> Sent: Tuesday, March 2, 2021 8:51 AM To: Iceberg Dev List <dev@iceberg.apache.org<mailto:dev@iceberg.apache.org>> Cc: rb...@netflix.com<mailto:rb...@netflix.com> Subject: Re: Reading data from Iceberg table into Apache Arrow in Java Hi Mayur, Playing around with the idea of implementing vectorized reads for Hive so your message come just in time :) Took a quick look at the code but I do not really understand how vectorized reads handle deletes. In non-vectorized code-path I have found this which filters the rows one-by-one: https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277 DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema); Schema requiredSchema = deletes.requiredSchema(); return deletes.filter(openTask(currentTask, requiredSchema)); In your code I have found that the delete files encryption keys are collected, but not sure how they are used: https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59 task.files().stream() .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream())) .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata())); Could you please help me with some quick pointers? Thanks, Peter On Mar 1, 2021, at 16:17, Mayur Srivastava <mayur.srivast...@twosigma.com<mailto:mayur.srivast...@twosigma.com>> wrote: Hi Ryan, I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the vectorized arrow reader: This is my first Iceberg pull request - I'm not fully aware of the contributing conventions of this repo, so let me know if any changes are needed in the pr. I've refactored some code from the Spark vectorized reader and added an ArrowReader which is a vectorized reader in Iceberg core. About the ArrowReader: 1. I’ve put the ArrowReader in the iceberg-data module because it needed to access the Iceberg table scan. Let me know if the reader needs to be moved. 2. I had to make a dependency addition of ‘iceberg-arrow’ for the iceberg-data module. Specially for the ArrowReaderTest, I had to add the following. Let me know if there is a better way for doing this. compileOnly("org.apache.arrow:arrow-vector") { exclude group: 'io.netty', module: 'netty-buffer' exclude group: 'com.google.code.findbugs', module: 'jsr305' } 3. Most of the code in ArrowReader is taken from the spark vectorized reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do it yet. 4. ArrowReader returns an iterator of VectorSchemaRoot and the behavior is explained in the Javadoc. 5. Some small changes were needed in IcebergGenerics to expose the table scan object and VectorizedArrowReader to allocate different timestamp Arrow vectors based on with/without timezone. 6. All prepush gradle tests pass except one which is still running (and it seems very slow - TestFlinkIcebergSink). 7. I've not performed any performance tests with the implementation yet. I'm planning to do so this week. Following are some limitations/questions for this implementation: 1. The arrow vector type is coupled with the physical data type in the parquet file: When column data contains a constant value, the column is dictionary encoded and the returned Arrow type is int32 irrespective of the Iceberg data type. I think that the Arrow vector type should be consistent with the logical Iceberg data type (and not change due to the physical data type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() that is currently ignored. 2. Type promotion does not work: In the ArrowReaderTest, the data for column ‘int_promotion’ was written as int, and then type was promoted from int to long, but the Arrow reader still returns IntVector. I think that the Arrow vector type should be consistent with the promoted logical Iceberg data type. 3. Data type limitations: a. Types not tested: UUID, FixedType, DecimalType. In the ArrowReaderTest, the parquet write was failing for these data types (due to a null pointer exception in ParquetMetadataConverter.addRowGroup: columnMetadata.getStatistics() was null). Are there unit tests with these types that write to parquet? b. Types not supported: TimeType, ListType, MapType, StructType. What is the path to add Arrow support for these data types? Thanks, Mayur From: Mayur Srivastava <mayur.srivast...@twosigma.com<mailto:mayur.srivast...@twosigma.com>> Sent: Friday, February 12, 2021 7:41 PM To: dev@iceberg.apache.org<mailto:dev@iceberg.apache.org>; rb...@netflix.com<mailto:rb...@netflix.com> Subject: RE: Reading data from Iceberg table into Apache Arrow in Java Thank you Ryan. I’ll dig into the file scan plan and Spark codebase to learn about the internals of Iceberg vectorized read path. Then, I’ll try to implement the vectorized reader using core components only. I’ll be happy to work with you to contribute it back to the upstream. I’ll get back to you if I’ve any question or need any more pointers. Thanks, Mayur From: Ryan Blue <rb...@netflix.com.INVALID<mailto:rb...@netflix.com.INVALID>> Sent: Friday, February 12, 2021 2:26 PM To: Iceberg Dev List <dev@iceberg.apache.org<mailto:dev@iceberg.apache.org>> Subject: Re: Reading data from Iceberg table into Apache Arrow in Java Hi Mayur, We built the Arrow support with Spark as the first use case, so the best examples of how to use it are in Spark. The generic reader does two things: it plans a scan and sets up an iterator of file readers to produce generic records. What you want to do is the same thing, but set up the file readers to produce Arrow batches. You can do that by changing the `Parquet.read` call and passing the callback to create an Arrow batch reader rather than generic row reader. I don't think there is a public example of this, but maybe someone else knows about one. This isn't available in Iceberg yet, but if you want to add it we'd be happy to help you get it in. The Spark read path has a good example, but it also wraps the Arrow batches so Spark can read them. Also, keep in mind that the Arrow integration only supports flat schemas right now, not fully nested schemas. So you'd need to still fall back to the row-based path. (Side note, if you have code to convert generics to Arrow, that's really useful to post somewhere.) I hope that helps. It would be great to work with you to improve this in a couple of PRs! rb On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <mayur.srivast...@twosigma.com<mailto:mayur.srivast...@twosigma.com>> wrote: Hi, We have an existing time series data access service based on Arrow/Flight which uses Apache Arrow format data to perform writes and reads (using time range queries) from a bespoke table-backend based on a S3 compatible storage. We are trying to replace our bespoke table-backend with Iceberg tables. For integrating with Iceberg, we are using Iceberg core+data+parquet modules directly to write and read data. I would like to note that our service cannot use the Spark route to write or read the data. In our current Iceberg reader integration code, we are using IcebergGenerics.read(table).select(...).where(...).build() to iterate through the data row-by-row. Instead of this (potentially slower) read path which needs conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized read path which directly returns an Arrow VectorSchemaRoot as a callback or Arrow record batches as the result set. I have noticed that Iceberg already has an Arrow modulehttps://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow. I have also looked into https://github.com/apache/iceberg/issues/9 and https://github.com/apache/iceberg/milestone/2. But, I’m not sure about the current status of the vectorized reader support. I’m also not sure how this Arrow module is being used to perform a vectorized read to execute a query on an Iceberg table in the core/data/parquet library. I have a few questions regarding the Vectorized reader/Arrow support: 1. Is it possible to run a vectorized read on an Iceberg table to return data in Arrow format using a non-Spark reader in Java? 2. Is there an example of reading data in Arrow format from an Iceberg table? 3. Is the Spark read path completely vectorized? I ask this question to find out if we can borrow from the vectorized Spark reader or we can move code from vectorized Spark reader to the Iceberg core library. Let me know if you have any questions for me. Thanks, Mayur -- Ryan Blue Software Engineer Netflix