Hi Mayur,

Playing around with the idea of implementing vectorized reads for Hive so your 
message come just in time :)

Took a quick look at the code but I do not really understand how vectorized 
reads handle deletes.

In non-vectorized code-path I have found this which filters the rows one-by-one:
https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277
 
<https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277>

          DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, 
tableSchema, readSchema);
          Schema requiredSchema = deletes.requiredSchema();
          return deletes.filter(openTask(currentTask, requiredSchema));

In your code I have found that the delete files encryption keys are collected, 
but not sure how they are used:
https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59
 
<https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59>

    task.files().stream()
        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), 
fileScanTask.deletes().stream()))
        .forEach(file -> keyMetadata.put(file.path().toString(), 
file.keyMetadata()));

Could you please help me with some quick pointers?

Thanks,
Peter

> On Mar 1, 2021, at 16:17, Mayur Srivastava <mayur.srivast...@twosigma.com> 
> wrote:
> 
> Hi Ryan,
>  
> I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286 
> <https://github.com/apache/iceberg/pull/2286>) for the vectorized arrow 
> reader:
>  
> This is my first Iceberg pull request - I'm not fully aware of the 
> contributing conventions of this repo, so let me know if any changes are 
> needed in the pr.
> I've refactored some code from the Spark vectorized reader and added an 
> ArrowReader which is a vectorized reader in Iceberg core.
>  
> About the ArrowReader:
> 1.      I’ve put the ArrowReader in the iceberg-data module because it needed 
> to access the Iceberg table scan. Let me know if the reader needs to be moved.
> 2.      I had to make a dependency addition of ‘iceberg-arrow’ for the 
> iceberg-data module. Specially for the ArrowReaderTest, I had to add the 
> following. Let me know if there is a better way for doing this.
> compileOnly("org.apache.arrow:arrow-vector") {
> exclude group: 'io.netty', module: 'netty-buffer'
> exclude group: 'com.google.code.findbugs', module: 'jsr305'
> }
> 3.      Most of the code in ArrowReader is taken from the spark vectorized 
> reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ 
> in both versions, but I did not attempt to do it yet.
> 4.      ArrowReader returns an iterator of VectorSchemaRoot and the behavior 
> is explained in the Javadoc.
> 5.      Some small changes were needed in IcebergGenerics to expose the table 
> scan object and VectorizedArrowReader to allocate different timestamp Arrow 
> vectors based on with/without timezone.
> 6.      All prepush gradle tests pass except one which is still running (and 
> it seems very slow - TestFlinkIcebergSink).
> 7.      I've not performed any performance tests with the implementation yet. 
> I'm planning to do so this week.
>  
> Following are some limitations/questions for this implementation:
> 1.      The arrow vector type is coupled with the physical data type in the 
> parquet file: When column data contains a constant value, the column is 
> dictionary encoded and the returned Arrow type is int32 irrespective of the 
> Iceberg data type. I think that the Arrow vector type should be consistent 
> with the logical Iceberg data type (and not change due to the physical data 
> type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() that 
> is currently ignored.
> 2.      Type promotion does not work: In the ArrowReaderTest, the data for 
> column ‘int_promotion’ was written as int, and then type was promoted from 
> int to long, but the Arrow reader still returns IntVector. I think that the 
> Arrow vector type should be consistent with the promoted logical Iceberg data 
> type.
> 3.      Data type limitations:
> a.      Types not tested: UUID, FixedType, DecimalType. In the 
> ArrowReaderTest, the parquet write was failing for these data types (due to a 
> null pointer exception in ParquetMetadataConverter.addRowGroup: 
> columnMetadata.getStatistics() was null). Are there unit tests with these 
> types that write to parquet?
> b.      Types not supported: TimeType, ListType, MapType, StructType. What is 
> the path to add Arrow support for these data types?
>  
> Thanks,
> Mayur
>  
> From: Mayur Srivastava <mayur.srivast...@twosigma.com> 
> Sent: Friday, February 12, 2021 7:41 PM
> To: dev@iceberg.apache.org; rb...@netflix.com
> Subject: RE: Reading data from Iceberg table into Apache Arrow in Java
>  
> Thank you Ryan.
>  
> I’ll dig into the file scan plan and Spark codebase to learn about the 
> internals of Iceberg vectorized read path. Then, I’ll try to implement the 
> vectorized reader using core components only. I’ll be happy to work with you 
> to contribute it back to the upstream. I’ll get back to you if I’ve any 
> question or need any more pointers.
>  
> Thanks,
> Mayur
>  
> From: Ryan Blue <rb...@netflix.com.INVALID 
> <mailto:rb...@netflix.com.INVALID>> 
> Sent: Friday, February 12, 2021 2:26 PM
> To: Iceberg Dev List <dev@iceberg.apache.org <mailto:dev@iceberg.apache.org>>
> Subject: Re: Reading data from Iceberg table into Apache Arrow in Java
>  
> Hi Mayur,
>  
> We built the Arrow support with Spark as the first use case, so the best 
> examples of how to use it are in Spark.
>  
> The generic reader does two things: it plans a scan and sets up an iterator 
> of file readers to produce generic records. What you want to do is the same 
> thing, but set up the file readers to produce Arrow batches. You can do that 
> by changing the `Parquet.read` call and passing the callback to create an 
> Arrow batch reader rather than generic row reader. I don't think there is a 
> public example of this, but maybe someone else knows about one. This isn't 
> available in Iceberg yet, but if you want to add it we'd be happy to help you 
> get it in.
>  
> The Spark read path has a good example, but it also wraps the Arrow batches 
> so Spark can read them. Also, keep in mind that the Arrow integration only 
> supports flat schemas right now, not fully nested schemas. So you'd need to 
> still fall back to the row-based path. (Side note, if you have code to 
> convert generics to Arrow, that's really useful to post somewhere.)
>  
> I hope that helps. It would be great to work with you to improve this in a 
> couple of PRs!
>  
> rb
>  
> On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava 
> <mayur.srivast...@twosigma.com <mailto:mayur.srivast...@twosigma.com>> wrote:
> Hi,
>  
> We have an existing time series data access service based on Arrow/Flight 
> which uses Apache Arrow format data to perform writes and reads (using time 
> range queries) from a bespoke table-backend based on a S3 compatible storage. 
>  
> We are trying to replace our bespoke table-backend with Iceberg tables. For 
> integrating with Iceberg, we are using Iceberg core+data+parquet modules 
> directly to write and read data. I would like to note that our service cannot 
> use the Spark route to write or read the data. In our current Iceberg reader 
> integration code, we are using 
> IcebergGenerics.read(table).select(...).where(...).build() to iterate through 
> the data row-by-row. Instead of this (potentially slower) read path which 
> needs conversion between rows and Arrow VectorSchemaRoot, we want to use a 
> vectorized read path which directly returns an Arrow VectorSchemaRoot as a 
> callback or Arrow record batches as the result set. 
>  
> I have noticed that Iceberg already has an Arrow 
> modulehttps://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow
>  
> <https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow>.
>  I  have also looked into https://github.com/apache/iceberg/issues/9 
> <https://github.com/apache/iceberg/issues/9> and 
> https://github.com/apache/iceberg/milestone/2 
> <https://github.com/apache/iceberg/milestone/2>. But, I’m not sure about the 
> current status of the vectorized reader support. I’m also not sure how this 
> Arrow module is being used to perform a vectorized read to execute a query on 
> an Iceberg table in the core/data/parquet library.
>  
> I have a few questions regarding the Vectorized reader/Arrow support:
> 1.      Is it possible to run a vectorized read on an Iceberg table to return 
> data in Arrow format using a non-Spark reader in Java?
> 2.      Is there an example of reading data in Arrow format from an Iceberg 
> table?
> 3.      Is the Spark read path completely vectorized? I ask this question to 
> find out if we can borrow from the vectorized Spark reader or we can move 
> code from vectorized Spark reader to the Iceberg core library.
>  
> Let me know if you have any questions for me.
>  
> Thanks,
> Mayur
>  
> 
>  
> -- 
> Ryan Blue
> Software Engineer
> Netflix

Reply via email to