Hi Ryan,
I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the
vectorized arrow reader:
This is my first Iceberg pull request - I'm not fully aware of the contributing
conventions of this repo, so let me know if any changes are needed in the pr.
I've refactored some code from the Spark vectorized reader and added an
ArrowReader which is a vectorized reader in Iceberg core.
About the ArrowReader:
1. I’ve put the ArrowReader in the iceberg-data module because it needed
to access the Iceberg table scan. Let me know if the reader needs to be moved.
2. I had to make a dependency addition of ‘iceberg-arrow’ for the
iceberg-data module. Specially for the ArrowReaderTest, I had to add the
following. Let me know if there is a better way for doing this.
compileOnly("org.apache.arrow:arrow-vector") {
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
3. Most of the code in ArrowReader is taken from the spark vectorized
reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ in
both versions, but I did not attempt to do it yet.
4. ArrowReader returns an iterator of VectorSchemaRoot and the behavior is
explained in the Javadoc.
5. Some small changes were needed in IcebergGenerics to expose the table
scan object and VectorizedArrowReader to allocate different timestamp Arrow
vectors based on with/without timezone.
6. All prepush gradle tests pass except one which is still running (and it
seems very slow - TestFlinkIcebergSink).
7. I've not performed any performance tests with the implementation yet.
I'm planning to do so this week.
Following are some limitations/questions for this implementation:
1. The arrow vector type is coupled with the physical data type in the
parquet file: When column data contains a constant value, the column is
dictionary encoded and the returned Arrow type is int32 irrespective of the
Iceberg data type. I think that the Arrow vector type should be consistent with
the logical Iceberg data type (and not change due to the physical data type).
There is a test ArrowReaderTest.testReadAllWithConstantRecords() that is
currently ignored.
2. Type promotion does not work: In the ArrowReaderTest, the data for
column ‘int_promotion’ was written as int, and then type was promoted from int
to long, but the Arrow reader still returns IntVector. I think that the Arrow
vector type should be consistent with the promoted logical Iceberg data type.
3. Data type limitations:
a. Types not tested: UUID, FixedType, DecimalType. In the ArrowReaderTest,
the parquet write was failing for these data types (due to a null pointer
exception in ParquetMetadataConverter.addRowGroup:
columnMetadata.getStatistics() was null). Are there unit tests with these types
that write to parquet?
b. Types not supported: TimeType, ListType, MapType, StructType. What is
the path to add Arrow support for these data types?
Thanks,
Mayur
From: Mayur Srivastava <[email protected]>
Sent: Friday, February 12, 2021 7:41 PM
To: [email protected]; [email protected]
Subject: RE: Reading data from Iceberg table into Apache Arrow in Java
Thank you Ryan.
I’ll dig into the file scan plan and Spark codebase to learn about the
internals of Iceberg vectorized read path. Then, I’ll try to implement the
vectorized reader using core components only. I’ll be happy to work with you to
contribute it back to the upstream. I’ll get back to you if I’ve any question
or need any more pointers.
Thanks,
Mayur
From: Ryan Blue <[email protected]<mailto:[email protected]>>
Sent: Friday, February 12, 2021 2:26 PM
To: Iceberg Dev List <[email protected]<mailto:[email protected]>>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java
Hi Mayur,
We built the Arrow support with Spark as the first use case, so the best
examples of how to use it are in Spark.
The generic reader does two things: it plans a scan and sets up an iterator of
file readers to produce generic records. What you want to do is the same thing,
but set up the file readers to produce Arrow batches. You can do that by
changing the `Parquet.read` call and passing the callback to create an Arrow
batch reader rather than generic row reader. I don't think there is a public
example of this, but maybe someone else knows about one. This isn't available
in Iceberg yet, but if you want to add it we'd be happy to help you get it in.
The Spark read path has a good example, but it also wraps the Arrow batches so
Spark can read them. Also, keep in mind that the Arrow integration only
supports flat schemas right now, not fully nested schemas. So you'd need to
still fall back to the row-based path. (Side note, if you have code to convert
generics to Arrow, that's really useful to post somewhere.)
I hope that helps. It would be great to work with you to improve this in a
couple of PRs!
rb
On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava
<[email protected]<mailto:[email protected]>> wrote:
Hi,
We have an existing time series data access service based on Arrow/Flight which
uses Apache Arrow format data to perform writes and reads (using time range
queries) from a bespoke table-backend based on a S3 compatible storage.
We are trying to replace our bespoke table-backend with Iceberg tables. For
integrating with Iceberg, we are using Iceberg core+data+parquet modules
directly to write and read data. I would like to note that our service cannot
use the Spark route to write or read the data. In our current Iceberg reader
integration code, we are using
IcebergGenerics.read(table).select(...).where(...).build() to iterate through
the data row-by-row. Instead of this (potentially slower) read path which needs
conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized
read path which directly returns an Arrow VectorSchemaRoot as a callback or
Arrow record batches as the result set.
I have noticed that Iceberg already has an Arrow module
https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow.
I have also looked into https://github.com/apache/iceberg/issues/9 and
https://github.com/apache/iceberg/milestone/2. But, I’m not sure about the
current status of the vectorized reader support. I’m also not sure how this
Arrow module is being used to perform a vectorized read to execute a query on
an Iceberg table in the core/data/parquet library.
I have a few questions regarding the Vectorized reader/Arrow support:
1. Is it possible to run a vectorized read on an Iceberg table to return
data in Arrow format using a non-Spark reader in Java?
2. Is there an example of reading data in Arrow format from an Iceberg
table?
3. Is the Spark read path completely vectorized? I ask this question to
find out if we can borrow from the vectorized Spark reader or we can move code
from vectorized Spark reader to the Iceberg core library.
Let me know if you have any questions for me.
Thanks,
Mayur
--
Ryan Blue
Software Engineer
Netflix