>> Should we proceed with this pr and later add support for vectorized reads in 
>> a separate pr?
I meant support deletes in the vectorized reader.

Thanks,
Mayur

From: Mayur Srivastava <mayur.srivast...@twosigma.com>
Sent: Wednesday, March 3, 2021 6:41 AM
To: dev@iceberg.apache.org
Cc: Ryan Blue <rb...@netflix.com>
Subject: RE: Reading data from Iceberg table into Apache Arrow in Java

Thanks for finding out Peter.

Should we proceed with this pr and later add support for vectorized reads in a 
separate pr?
There are also some other limitations in the current pr (listed in the pr) 
which could be addressed in subsequent prs.

Thanks,
Mayur

From: Peter Vary <pv...@cloudera.com.INVALID<mailto:pv...@cloudera.com.INVALID>>
Sent: Tuesday, March 2, 2021 10:38 AM
To: Iceberg Dev List <dev@iceberg.apache.org<mailto:dev@iceberg.apache.org>>
Cc: Ryan Blue <rb...@netflix.com<mailto:rb...@netflix.com>>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

After some more digging, I think there is no solution yet for vectorized reads 
with deletes.

this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && 
(allOrcFileScanTasks ||
     (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));


Mayur Srivastava 
<mayur.srivast...@twosigma.com<mailto:mayur.srivast...@twosigma.com>> ezt írta 
(időpont: 2021. márc. 2., Ke 15:48):
Hi Peter,

Good point.

Most of the ArrowReader code is inspired from the Spark’s vectorized reader 
(src: 
https://github.com/apache/iceberg/blob/631efec2f9ce1f0526d6613c81ac8fd0ccb95b5e/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java#L68).


We’ll probably need some help from someone who understands the Spark vectorized 
read path.

But, I’ll read the code to understand the deletes.

Thanks,
Mayur

From: Peter Vary <pv...@cloudera.com.INVALID<mailto:pv...@cloudera.com.INVALID>>
Sent: Tuesday, March 2, 2021 8:51 AM
To: Iceberg Dev List <dev@iceberg.apache.org<mailto:dev@iceberg.apache.org>>
Cc: rb...@netflix.com<mailto:rb...@netflix.com>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

Playing around with the idea of implementing vectorized reads for Hive so your 
message come just in time :)

Took a quick look at the code but I do not really understand how vectorized 
reads handle deletes.

In non-vectorized code-path I have found this which filters the rows one-by-one:
https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277

          DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, 
tableSchema, readSchema);
          Schema requiredSchema = deletes.requiredSchema();
          return deletes.filter(openTask(currentTask, requiredSchema));

In your code I have found that the delete files encryption keys are collected, 
but not sure how they are used:
https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59

    task.files().stream()
        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), 
fileScanTask.deletes().stream()))
        .forEach(file -> keyMetadata.put(file.path().toString(), 
file.keyMetadata()));

Could you please help me with some quick pointers?

Thanks,
Peter

On Mar 1, 2021, at 16:17, Mayur Srivastava 
<mayur.srivast...@twosigma.com<mailto:mayur.srivast...@twosigma.com>> wrote:

Hi Ryan,

I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the 
vectorized arrow reader:

This is my first Iceberg pull request - I'm not fully aware of the contributing 
conventions of this repo, so let me know if any changes are needed in the pr.
I've refactored some code from the Spark vectorized reader and added an 
ArrowReader which is a vectorized reader in Iceberg core.

About the ArrowReader:
1.      I’ve put the ArrowReader in the iceberg-data module because it needed 
to access the Iceberg table scan. Let me know if the reader needs to be moved.
2.      I had to make a dependency addition of ‘iceberg-arrow’ for the 
iceberg-data module. Specially for the ArrowReaderTest, I had to add the 
following. Let me know if there is a better way for doing this.
compileOnly("org.apache.arrow:arrow-vector") {
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
3.      Most of the code in ArrowReader is taken from the spark vectorized 
reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ in 
both versions, but I did not attempt to do it yet.
4.      ArrowReader returns an iterator of VectorSchemaRoot and the behavior is 
explained in the Javadoc.
5.      Some small changes were needed in IcebergGenerics to expose the table 
scan object and VectorizedArrowReader to allocate different timestamp Arrow 
vectors based on with/without timezone.
6.      All prepush gradle tests pass except one which is still running (and it 
seems very slow - TestFlinkIcebergSink).
7.      I've not performed any performance tests with the implementation yet. 
I'm planning to do so this week.

Following are some limitations/questions for this implementation:
1.      The arrow vector type is coupled with the physical data type in the 
parquet file: When column data contains a constant value, the column is 
dictionary encoded and the returned Arrow type is int32 irrespective of the 
Iceberg data type. I think that the Arrow vector type should be consistent with 
the logical Iceberg data type (and not change due to the physical data type). 
There is a test ArrowReaderTest.testReadAllWithConstantRecords() that is 
currently ignored.
2.      Type promotion does not work: In the ArrowReaderTest, the data for 
column ‘int_promotion’ was written as int, and then type was promoted from int 
to long, but the Arrow reader still returns IntVector. I think that the Arrow 
vector type should be consistent with the promoted logical Iceberg data type.
3.      Data type limitations:
a.      Types not tested: UUID, FixedType, DecimalType. In the ArrowReaderTest, 
the parquet write was failing for these data types (due to a null pointer 
exception in ParquetMetadataConverter.addRowGroup: 
columnMetadata.getStatistics() was null). Are there unit tests with these types 
that write to parquet?
b.      Types not supported: TimeType, ListType, MapType, StructType. What is 
the path to add Arrow support for these data types?

Thanks,
Mayur

From: Mayur Srivastava 
<mayur.srivast...@twosigma.com<mailto:mayur.srivast...@twosigma.com>>
Sent: Friday, February 12, 2021 7:41 PM
To: dev@iceberg.apache.org<mailto:dev@iceberg.apache.org>; 
rb...@netflix.com<mailto:rb...@netflix.com>
Subject: RE: Reading data from Iceberg table into Apache Arrow in Java

Thank you Ryan.

I’ll dig into the file scan plan and Spark codebase to learn about the 
internals of Iceberg vectorized read path. Then, I’ll try to implement the 
vectorized reader using core components only. I’ll be happy to work with you to 
contribute it back to the upstream. I’ll get back to you if I’ve any question 
or need any more pointers.

Thanks,
Mayur

From: Ryan Blue <rb...@netflix.com.INVALID<mailto:rb...@netflix.com.INVALID>>
Sent: Friday, February 12, 2021 2:26 PM
To: Iceberg Dev List <dev@iceberg.apache.org<mailto:dev@iceberg.apache.org>>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

We built the Arrow support with Spark as the first use case, so the best 
examples of how to use it are in Spark.

The generic reader does two things: it plans a scan and sets up an iterator of 
file readers to produce generic records. What you want to do is the same thing, 
but set up the file readers to produce Arrow batches. You can do that by 
changing the `Parquet.read` call and passing the callback to create an Arrow 
batch reader rather than generic row reader. I don't think there is a public 
example of this, but maybe someone else knows about one. This isn't available 
in Iceberg yet, but if you want to add it we'd be happy to help you get it in.

The Spark read path has a good example, but it also wraps the Arrow batches so 
Spark can read them. Also, keep in mind that the Arrow integration only 
supports flat schemas right now, not fully nested schemas. So you'd need to 
still fall back to the row-based path. (Side note, if you have code to convert 
generics to Arrow, that's really useful to post somewhere.)

I hope that helps. It would be great to work with you to improve this in a 
couple of PRs!

rb

On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava 
<mayur.srivast...@twosigma.com<mailto:mayur.srivast...@twosigma.com>> wrote:
Hi,

We have an existing time series data access service based on Arrow/Flight which 
uses Apache Arrow format data to perform writes and reads (using time range 
queries) from a bespoke table-backend based on a S3 compatible storage.

We are trying to replace our bespoke table-backend with Iceberg tables. For 
integrating with Iceberg, we are using Iceberg core+data+parquet modules 
directly to write and read data. I would like to note that our service cannot 
use the Spark route to write or read the data. In our current Iceberg reader 
integration code, we are using 
IcebergGenerics.read(table).select(...).where(...).build() to iterate through 
the data row-by-row. Instead of this (potentially slower) read path which needs 
conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized 
read path which directly returns an Arrow VectorSchemaRoot as a callback or 
Arrow record batches as the result set.

I have noticed that Iceberg already has an Arrow 
modulehttps://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow.
 I  have also looked into https://github.com/apache/iceberg/issues/9 and 
https://github.com/apache/iceberg/milestone/2. But, I’m not sure about the 
current status of the vectorized reader support. I’m also not sure how this 
Arrow module is being used to perform a vectorized read to execute a query on 
an Iceberg table in the core/data/parquet library.

I have a few questions regarding the Vectorized reader/Arrow support:
1.      Is it possible to run a vectorized read on an Iceberg table to return 
data in Arrow format using a non-Spark reader in Java?
2.      Is there an example of reading data in Arrow format from an Iceberg 
table?
3.      Is the Spark read path completely vectorized? I ask this question to 
find out if we can borrow from the vectorized Spark reader or we can move code 
from vectorized Spark reader to the Iceberg core library.

Let me know if you have any questions for me.

Thanks,
Mayur



--
Ryan Blue
Software Engineer
Netflix

Reply via email to