Re: Approaching Vectorized Reading in Iceberg ..

2019-07-31 Thread Gautam
;>>>>>>>
>>>>>>>> Is there a config/plugin I need to add to build.gradle?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Gautam!
>>>>>>>>>
>>>>>>>>> We'll start taking a look at your code. What do you think about
>>>>>>>>> creating a branch in the Iceberg repository where we can work on 
>>>>>>>>> improving
>>>>>>>>> it together, before merging it into master?
>>>>>>>>>
>>>>>>>>> Also, you mentioned performance comparisons. Do you have any early
>>>>>>>>> results to share?
>>>>>>>>>
>>>>>>>>> rb
>>>>>>>>>
>>>>>>>>> On Tue, Jul 23, 2019 at 3:40 PM Gautam 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Folks,
>>>>>>>>>>
>>>>>>>>>> I have checked in a WIP branch [1] with a working version of
>>>>>>>>>> Vectorized reads for Iceberg reader. Here's the diff  [2].
>>>>>>>>>>
>>>>>>>>>> *Implementation Notes:*
>>>>>>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>>>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` 
>>>>>>>>>> instead of
>>>>>>>>>> the usual `planInputPartitions()`. It returns instances of 
>>>>>>>>>> `ColumnarBatch`
>>>>>>>>>> on each iteration.
>>>>>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion.
>>>>>>>>>> This was copied from [3] . Added by @Daniel Weeks
>>>>>>>>>>  . Thanks for that!
>>>>>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders
>>>>>>>>>> used for reading/decoding the Parquet rowgroups (aka pagestores as 
>>>>>>>>>> referred
>>>>>>>>>> to in the code)
>>>>>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>>>>>> properly. This has the added benefit of vectorization support for 
>>>>>>>>>> structs,
>>>>>>>>>> so yay!
>>>>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>>>>> tuning for right batch sizing but i'v gone with one batch per 
>>>>>>>>>> rowgroup for
>>>>>>>>>> now.
>>>>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector`
>>>>>>>>>> which is Spark's ColumnVector implementation backed by Arrow. This 
>>>>>>>>>> is the
>>>>>>>>>> first contact point between Spark and Arrow interfaces.
>>>>>>>>>>  - ArrowColumnVectors are stitched together into a
>>>>>>>>>> `ColumnarBatch` by `ColumnarBatchReader` . This is my replacement for
>>>>>>>>>> `InternalRowReader` which maps Structs to Columnar Batches. This 
>>>>>>>>>> allows us
>>>>>>>>>> to have nested structs where each level of nesting would be a nested
>>>>>>>>>> columnar batch. Lemme know what you think of this approach.
>>>>>>>>>>  - I'v added value readers for all supported primitive types
>>>>>>>>>> listed in `AvroDataTest`. There's a correspond

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-31 Thread Anjali Norwood
gt;>>>>>>> *Implementation Notes:*
>>>>>>>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to
>>>>>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` 
>>>>>>>>> instead of
>>>>>>>>> the usual `planInputPartitions()`. It returns instances of 
>>>>>>>>> `ColumnarBatch`
>>>>>>>>> on each iteration.
>>>>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion.
>>>>>>>>> This was copied from [3] . Added by @Daniel Weeks
>>>>>>>>>  . Thanks for that!
>>>>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders
>>>>>>>>> used for reading/decoding the Parquet rowgroups (aka pagestores as 
>>>>>>>>> referred
>>>>>>>>> to in the code)
>>>>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>>>>> properly. This has the added benefit of vectorization support for 
>>>>>>>>> structs,
>>>>>>>>> so yay!
>>>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>>>> tuning for right batch sizing but i'v gone with one batch per 
>>>>>>>>> rowgroup for
>>>>>>>>> now.
>>>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which
>>>>>>>>> is Spark's ColumnVector implementation backed by Arrow. This is the 
>>>>>>>>> first
>>>>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch`
>>>>>>>>> by `ColumnarBatchReader` . This is my replacement for 
>>>>>>>>> `InternalRowReader`
>>>>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>>>>> structs where each level of nesting would be a nested columnar batch. 
>>>>>>>>> Lemme
>>>>>>>>> know what you think of this approach.
>>>>>>>>>  - I'v added value readers for all supported primitive types
>>>>>>>>> listed in `AvroDataTest`. There's a corresponding test for vectorized
>>>>>>>>> reader under `TestSparkParquetVectorizedReader`
>>>>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to
>>>>>>>>> turn checkstyle off in build.gradle. Also skip tests while building..
>>>>>>>>> sorry! :-(
>>>>>>>>>
>>>>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore
>>>>>>>>> this as it's not used. This was from my previous impl of 
>>>>>>>>> Vectorization. I'v
>>>>>>>>> kept it around to compare performance.
>>>>>>>>>
>>>>>>>>> Lemme know what folks think of the approach. I'm getting this
>>>>>>>>> working for our scale test benchmark and will report back with 
>>>>>>>>> numbers.
>>>>>>>>> Feel free to run your own benchmarks and share.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1] -
>>>>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>>>>> [2] -
>>>>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>>>>> [3] -
>>>>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-31 Thread Gautam
>>>>> instruct the DataSourceV2ScanExec to use `planBatchPartitions()` 
>>>>>>>>> instead of
>>>>>>>>> the usual `planInputPartitions()`. It returns instances of 
>>>>>>>>> `ColumnarBatch`
>>>>>>>>> on each iteration.
>>>>>>>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion.
>>>>>>>>> This was copied from [3] . Added by @Daniel Weeks
>>>>>>>>>  . Thanks for that!
>>>>>>>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders
>>>>>>>>> used for reading/decoding the Parquet rowgroups (aka pagestores as 
>>>>>>>>> referred
>>>>>>>>> to in the code)
>>>>>>>>>  - `VectorizedSparkParquetReaders` contains the visitor
>>>>>>>>> implementations to map Parquet types to appropriate value readers. I
>>>>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>>>>> properly. This has the added benefit of vectorization support for 
>>>>>>>>> structs,
>>>>>>>>> so yay!
>>>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>>>> tuning for right batch sizing but i'v gone with one batch per 
>>>>>>>>> rowgroup for
>>>>>>>>> now.
>>>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which
>>>>>>>>> is Spark's ColumnVector implementation backed by Arrow. This is the 
>>>>>>>>> first
>>>>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch`
>>>>>>>>> by `ColumnarBatchReader` . This is my replacement for 
>>>>>>>>> `InternalRowReader`
>>>>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>>>>> structs where each level of nesting would be a nested columnar batch. 
>>>>>>>>> Lemme
>>>>>>>>> know what you think of this approach.
>>>>>>>>>  - I'v added value readers for all supported primitive types
>>>>>>>>> listed in `AvroDataTest`. There's a corresponding test for vectorized
>>>>>>>>> reader under `TestSparkParquetVectorizedReader`
>>>>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to
>>>>>>>>> turn checkstyle off in build.gradle. Also skip tests while building..
>>>>>>>>> sorry! :-(
>>>>>>>>>
>>>>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore
>>>>>>>>> this as it's not used. This was from my previous impl of 
>>>>>>>>> Vectorization. I'v
>>>>>>>>> kept it around to compare performance.
>>>>>>>>>
>>>>>>>>> Lemme know what folks think of the approach. I'm getting this
>>>>>>>>> working for our scale test benchmark and will report back with 
>>>>>>>>> numbers.
>>>>>>>>> Feel free to run your own benchmarks and share.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> -Gautam.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1] -
>>>>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>>>>> [2] -
>>>>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>>>>> [3] -
>>>>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>>>>
>

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-31 Thread Gautam
gt;> properly. This has the added benefit of vectorization support for 
>>>>>>>> structs,
>>>>>>>> so yay!
>>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>>> tuning for right batch sizing but i'v gone with one batch per rowgroup 
>>>>>>>> for
>>>>>>>> now.
>>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which
>>>>>>>> is Spark's ColumnVector implementation backed by Arrow. This is the 
>>>>>>>> first
>>>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch`
>>>>>>>> by `ColumnarBatchReader` . This is my replacement for 
>>>>>>>> `InternalRowReader`
>>>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>>>> structs where each level of nesting would be a nested columnar batch. 
>>>>>>>> Lemme
>>>>>>>> know what you think of this approach.
>>>>>>>>  - I'v added value readers for all supported primitive types listed
>>>>>>>> in `AvroDataTest`. There's a corresponding test for vectorized reader 
>>>>>>>> under
>>>>>>>> `TestSparkParquetVectorizedReader`
>>>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to
>>>>>>>> turn checkstyle off in build.gradle. Also skip tests while building..
>>>>>>>> sorry! :-(
>>>>>>>>
>>>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore
>>>>>>>> this as it's not used. This was from my previous impl of 
>>>>>>>> Vectorization. I'v
>>>>>>>> kept it around to compare performance.
>>>>>>>>
>>>>>>>> Lemme know what folks think of the approach. I'm getting this
>>>>>>>> working for our scale test benchmark and will report back with numbers.
>>>>>>>> Feel free to run your own benchmarks and share.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -Gautam.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> [1] -
>>>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>>>> [2] -
>>>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>>>> [3] -
>>>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Will do. Doing a bit of housekeeping on the code and also adding
>>>>>>>>> more primitive type support.
>>>>>>>>>
>>>>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Would it be possible to put the work in progress code in open
>>>>>>>>>> source?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *From: *Gautam 
>>>>>>>>>> *Reply-To: *"dev@iceberg.apache.org" 
>>>>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>>>>> *To: *Daniel Weeks 
>>>>>>>>>> *Cc: *Ryan Blue , Iceberg Dev List <
>>>>>>>>>> dev@iceberg.apache.org>
>>>>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>>>>
>>>>>>>>>>
>>>&

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-30 Thread Samarth Jain
t;>>> implementations to map Parquet types to appropriate value readers. I
>>>>>>> implemented the struct visitor so that the root schema can be mapped
>>>>>>> properly. This has the added benefit of vectorization support for 
>>>>>>> structs,
>>>>>>> so yay!
>>>>>>>  - For the initial version the value readers read an entire row
>>>>>>> group into a single Arrow Field Vector. this i'd imagine will require
>>>>>>> tuning for right batch sizing but i'v gone with one batch per rowgroup 
>>>>>>> for
>>>>>>> now.
>>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which
>>>>>>> is Spark's ColumnVector implementation backed by Arrow. This is the 
>>>>>>> first
>>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch`
>>>>>>> by `ColumnarBatchReader` . This is my replacement for 
>>>>>>> `InternalRowReader`
>>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>>> structs where each level of nesting would be a nested columnar batch. 
>>>>>>> Lemme
>>>>>>> know what you think of this approach.
>>>>>>>  - I'v added value readers for all supported primitive types listed
>>>>>>> in `AvroDataTest`. There's a corresponding test for vectorized reader 
>>>>>>> under
>>>>>>> `TestSparkParquetVectorizedReader`
>>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to
>>>>>>> turn checkstyle off in build.gradle. Also skip tests while building..
>>>>>>> sorry! :-(
>>>>>>>
>>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this
>>>>>>> as it's not used. This was from my previous impl of Vectorization. I'v 
>>>>>>> kept
>>>>>>> it around to compare performance.
>>>>>>>
>>>>>>> Lemme know what folks think of the approach. I'm getting this
>>>>>>> working for our scale test benchmark and will report back with numbers.
>>>>>>> Feel free to run your own benchmarks and share.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> -Gautam.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [1] -
>>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>>> [2] -
>>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>>> [3] -
>>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Will do. Doing a bit of housekeeping on the code and also adding
>>>>>>>> more primitive type support.
>>>>>>>>
>>>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Would it be possible to put the work in progress code in open
>>>>>>>>> source?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From: *Gautam 
>>>>>>>>> *Reply-To: *"dev@iceberg.apache.org" 
>>>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>>>> *To: *Daniel Weeks 
>>>>>>>>> *Cc: *Ryan Blue , Iceberg Dev List <
>>>>>>>>> dev@iceberg.apache.org>
>>>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> That would be great!
>>>&

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-26 Thread Gautam
wgroup for now.
>>>>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>>>>>> Spark's ColumnVector implementation backed by Arrow. This is the first
>>>>>> contact point between Spark and Arrow interfaces.
>>>>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>>>>>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>>>>> which maps Structs to Columnar Batches. This allows us to have nested
>>>>>> structs where each level of nesting would be a nested columnar batch. 
>>>>>> Lemme
>>>>>> know what you think of this approach.
>>>>>>  - I'v added value readers for all supported primitive types listed
>>>>>> in `AvroDataTest`. There's a corresponding test for vectorized reader 
>>>>>> under
>>>>>> `TestSparkParquetVectorizedReader`
>>>>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>>>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! 
>>>>>> :-(
>>>>>>
>>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this
>>>>>> as it's not used. This was from my previous impl of Vectorization. I'v 
>>>>>> kept
>>>>>> it around to compare performance.
>>>>>>
>>>>>> Lemme know what folks think of the approach. I'm getting this working
>>>>>> for our scale test benchmark and will report back with numbers. Feel free
>>>>>> to run your own benchmarks and share.
>>>>>>
>>>>>> Cheers,
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1] -
>>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>>> [2] -
>>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>>> [3] -
>>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam 
>>>>>> wrote:
>>>>>>
>>>>>>> Will do. Doing a bit of housekeeping on the code and also adding
>>>>>>> more primitive type support.
>>>>>>>
>>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Would it be possible to put the work in progress code in open
>>>>>>>> source?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From: *Gautam 
>>>>>>>> *Reply-To: *"dev@iceberg.apache.org" 
>>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>>> *To: *Daniel Weeks 
>>>>>>>> *Cc: *Ryan Blue , Iceberg Dev List <
>>>>>>>> dev@iceberg.apache.org>
>>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> That would be great!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hey Gautam,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> We also have a couple people looking into vectorized reading (into
>>>>>>>> Arrow memory).  I think it would be good for us to get together and 
>>>>>>>> see if
>>>>>>>> we can collaborate on a common approach for this.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I'll reach out directly and see if we can get together.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>&g

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-26 Thread Daniel Weeks
gt;>> Lemme
>>>>> know what you think of this approach.
>>>>>  - I'v added value readers for all supported primitive types listed in
>>>>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>>>>> `TestSparkParquetVectorizedReader`
>>>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! 
>>>>> :-(
>>>>>
>>>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this
>>>>> as it's not used. This was from my previous impl of Vectorization. I'v 
>>>>> kept
>>>>> it around to compare performance.
>>>>>
>>>>> Lemme know what folks think of the approach. I'm getting this working
>>>>> for our scale test benchmark and will report back with numbers. Feel free
>>>>> to run your own benchmarks and share.
>>>>>
>>>>> Cheers,
>>>>> -Gautam.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> [1] -
>>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>>> [2] -
>>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>>> [3] -
>>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>>
>>>>>
>>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam 
>>>>> wrote:
>>>>>
>>>>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>>>>> primitive type support.
>>>>>>
>>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah 
>>>>>> wrote:
>>>>>>
>>>>>>> Would it be possible to put the work in progress code in open source?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From: *Gautam 
>>>>>>> *Reply-To: *"dev@iceberg.apache.org" 
>>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>>> *To: *Daniel Weeks 
>>>>>>> *Cc: *Ryan Blue , Iceberg Dev List <
>>>>>>> dev@iceberg.apache.org>
>>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> That would be great!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hey Gautam,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> We also have a couple people looking into vectorized reading (into
>>>>>>> Arrow memory).  I think it would be good for us to get together and see 
>>>>>>> if
>>>>>>> we can collaborate on a common approach for this.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I'll reach out directly and see if we can get together.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -Dan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>>> without projection with schema set appropriately in `readSchema() `.. 
>>>>>>> the
>>>>>>> empty result was due to valuesRead not being set correctly on 
>>>>>>> FileIterator.
>>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hey Guys,
>>>>>>>
>>>>>>>Sorry bout the delay o

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-26 Thread Daniel Weeks
marks and share.
>>>>
>>>> Cheers,
>>>> -Gautam.
>>>>
>>>>
>>>>
>>>>
>>>> [1] -
>>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>>> [2] -
>>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>>> [3] -
>>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>>
>>>>
>>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam  wrote:
>>>>
>>>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>>>> primitive type support.
>>>>>
>>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah 
>>>>> wrote:
>>>>>
>>>>>> Would it be possible to put the work in progress code in open source?
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Gautam 
>>>>>> *Reply-To: *"dev@iceberg.apache.org" 
>>>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>>>> *To: *Daniel Weeks 
>>>>>> *Cc: *Ryan Blue , Iceberg Dev List <
>>>>>> dev@iceberg.apache.org>
>>>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>>>
>>>>>>
>>>>>>
>>>>>> That would be great!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks 
>>>>>> wrote:
>>>>>>
>>>>>> Hey Gautam,
>>>>>>
>>>>>>
>>>>>>
>>>>>> We also have a couple people looking into vectorized reading (into
>>>>>> Arrow memory).  I think it would be good for us to get together and see 
>>>>>> if
>>>>>> we can collaborate on a common approach for this.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I'll reach out directly and see if we can get together.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Dan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam 
>>>>>> wrote:
>>>>>>
>>>>>> Figured this out. I'm returning ColumnarBatch iterator directly
>>>>>> without projection with schema set appropriately in `readSchema() `.. the
>>>>>> empty result was due to valuesRead not being set correctly on 
>>>>>> FileIterator.
>>>>>> Did that and things are working. Will circle back with numbers soon.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 19, 2019 at 5:22 PM Gautam 
>>>>>> wrote:
>>>>>>
>>>>>> Hey Guys,
>>>>>>
>>>>>>Sorry bout the delay on this. Just got back on getting a
>>>>>> basic working implementation in Iceberg for Vectorization on primitive
>>>>>> types.
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Here's what I have so far :  *
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have added `ParquetValueReader` implementations for some basic
>>>>>> primitive types that build the respective Arrow Vector (`ValueVector`) 
>>>>>> viz.
>>>>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath 
>>>>>> each
>>>>>> value vector reader there are column iterators that read from the parquet
>>>>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>>>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>>>>> stitched together using a `ColumnarBatchReader` (which as the name 
>>>>>> suggests
>>>>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>>>>> work properly with the underlying interfaces.  I'v also made changes to
>>>>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>>>>>

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-26 Thread Gautam
This fails on master too btw. Just wondering if i'm doing something wrong
trying to run this.

On Fri, Jul 26, 2019 at 2:24 PM Gautam  wrote:

> I'v been trying to run the jmh benchmarks bundled within the project. I'v
> been running into issues with that .. have other hit this? Am I running
> these incorrectly?
>
>
> bash-3.2$ ./gradlew :iceberg-spark:jmh
> -PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
> -PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
> ..
> ...
> > Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
> error: plug-in not found: ErrorProne
>
> FAILURE: Build failed with an exception.
>
>
>
> Is there a config/plugin I need to add to build.gradle?
>
>
>
>
>
>
>
>
> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue  wrote:
>
>> Thanks Gautam!
>>
>> We'll start taking a look at your code. What do you think about creating
>> a branch in the Iceberg repository where we can work on improving it
>> together, before merging it into master?
>>
>> Also, you mentioned performance comparisons. Do you have any early
>> results to share?
>>
>> rb
>>
>> On Tue, Jul 23, 2019 at 3:40 PM Gautam  wrote:
>>
>>> Hello Folks,
>>>
>>> I have checked in a WIP branch [1] with a working version of Vectorized
>>> reads for Iceberg reader. Here's the diff  [2].
>>>
>>> *Implementation Notes:*
>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>>> each iteration.
>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>>> copied from [3] . Added by @Daniel Weeks  . Thanks
>>> for that!
>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>>> the code)
>>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>>> to map Parquet types to appropriate value readers. I implemented the struct
>>> visitor so that the root schema can be mapped properly. This has the added
>>> benefit of vectorization support for structs, so yay!
>>>  - For the initial version the value readers read an entire row group
>>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>>> Spark's ColumnVector implementation backed by Arrow. This is the first
>>> contact point between Spark and Arrow interfaces.
>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>> which maps Structs to Columnar Batches. This allows us to have nested
>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>> know what you think of this approach.
>>>  - I'v added value readers for all supported primitive types listed in
>>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>>> `TestSparkParquetVectorizedReader`
>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>>
>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>>> around to compare performance.
>>>
>>> Lemme know what folks think of the approach. I'm getting this working
>>> for our scale test benchmark and will report back with numbers. Feel free
>>> to run your own benchmarks and share.
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>> [2] -
>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>> [3] -
>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>
>>>
>>> On Mon, Jul 22, 2019 at 2:33 PM Gaut

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-26 Thread Gautam
I'v been trying to run the jmh benchmarks bundled within the project. I'v
been running into issues with that .. have other hit this? Am I running
these incorrectly?


bash-3.2$ ./gradlew :iceberg-spark:jmh
-PjmhIncludeRegex=IcebergSourceFlatParquetDataFilterBenchmark
-PjmhOutputPath=benchmark/iceberg-source-flat-parquet-data-filter-benchmark-result.txt
..
...
> Task :iceberg-spark:jmhCompileGeneratedClasses FAILED
error: plug-in not found: ErrorProne

FAILURE: Build failed with an exception.



Is there a config/plugin I need to add to build.gradle?








On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue  wrote:

> Thanks Gautam!
>
> We'll start taking a look at your code. What do you think about creating a
> branch in the Iceberg repository where we can work on improving it
> together, before merging it into master?
>
> Also, you mentioned performance comparisons. Do you have any early results
> to share?
>
> rb
>
> On Tue, Jul 23, 2019 at 3:40 PM Gautam  wrote:
>
>> Hello Folks,
>>
>> I have checked in a WIP branch [1] with a working version of Vectorized
>> reads for Iceberg reader. Here's the diff  [2].
>>
>> *Implementation Notes:*
>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>> each iteration.
>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>> copied from [3] . Added by @Daniel Weeks  . Thanks
>> for that!
>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>> the code)
>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>> to map Parquet types to appropriate value readers. I implemented the struct
>> visitor so that the root schema can be mapped properly. This has the added
>> benefit of vectorization support for structs, so yay!
>>  - For the initial version the value readers read an entire row group
>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>> Spark's ColumnVector implementation backed by Arrow. This is the first
>> contact point between Spark and Arrow interfaces.
>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>> which maps Structs to Columnar Batches. This allows us to have nested
>> structs where each level of nesting would be a nested columnar batch. Lemme
>> know what you think of this approach.
>>  - I'v added value readers for all supported primitive types listed in
>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>> `TestSparkParquetVectorizedReader`
>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>
>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>> around to compare performance.
>>
>> Lemme know what folks think of the approach. I'm getting this working for
>> our scale test benchmark and will report back with numbers. Feel free to
>> run your own benchmarks and share.
>>
>> Cheers,
>> -Gautam.
>>
>>
>>
>>
>> [1] -
>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>> [2] -
>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>> [3] -
>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>
>>
>> On Mon, Jul 22, 2019 at 2:33 PM Gautam  wrote:
>>
>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>> primitive type support.
>>>
>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:
>>>
>>>> Would it be possible to put the work in progress code in open source?
>>>>
>>>>
>>>>
>>>> *From: *Gautam 
>>>> *Reply-To: *"dev@iceberg.apache.org" 
>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>> *To: *Daniel Weeks 
>&g

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-24 Thread Daniel Weeks
Gautam,

I've created a branch off current master:
https://github.com/apache/incubator-iceberg/tree/vectorized-read

I've also created a milestone, so feel free to add issues and we can
associate them with the milestone:
https://github.com/apache/incubator-iceberg/milestone/2

-Dan

On Wed, Jul 24, 2019 at 4:21 PM Gautam  wrote:

> +1 on having a branch. Lemme know once you do i'l rebase and open a PR
> against it.
>
> Will get back to you on perf numbers soon.
>
> On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue  wrote:
>
>> Thanks Gautam!
>>
>> We'll start taking a look at your code. What do you think about creating
>> a branch in the Iceberg repository where we can work on improving it
>> together, before merging it into master?
>>
>> Also, you mentioned performance comparisons. Do you have any early
>> results to share?
>>
>> rb
>>
>> On Tue, Jul 23, 2019 at 3:40 PM Gautam  wrote:
>>
>>> Hello Folks,
>>>
>>> I have checked in a WIP branch [1] with a working version of Vectorized
>>> reads for Iceberg reader. Here's the diff  [2].
>>>
>>> *Implementation Notes:*
>>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>>> each iteration.
>>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>>> copied from [3] . Added by @Daniel Weeks  . Thanks
>>> for that!
>>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>>> the code)
>>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>>> to map Parquet types to appropriate value readers. I implemented the struct
>>> visitor so that the root schema can be mapped properly. This has the added
>>> benefit of vectorization support for structs, so yay!
>>>  - For the initial version the value readers read an entire row group
>>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>>> Spark's ColumnVector implementation backed by Arrow. This is the first
>>> contact point between Spark and Arrow interfaces.
>>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>>> which maps Structs to Columnar Batches. This allows us to have nested
>>> structs where each level of nesting would be a nested columnar batch. Lemme
>>> know what you think of this approach.
>>>  - I'v added value readers for all supported primitive types listed in
>>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>>> `TestSparkParquetVectorizedReader`
>>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>>
>>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>>> around to compare performance.
>>>
>>> Lemme know what folks think of the approach. I'm getting this working
>>> for our scale test benchmark and will report back with numbers. Feel free
>>> to run your own benchmarks and share.
>>>
>>> Cheers,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>>> [2] -
>>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>>> [3] -
>>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>>
>>>
>>> On Mon, Jul 22, 2019 at 2:33 PM Gautam  wrote:
>>>
>>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>>> primitive type support.
>>>>
>>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:
>>>>
>>>>> Would it be possible to put the work in progress co

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-24 Thread Gautam
+1 on having a branch. Lemme know once you do i'l rebase and open a PR
against it.

Will get back to you on perf numbers soon.

On Wed, Jul 24, 2019 at 2:03 PM Ryan Blue  wrote:

> Thanks Gautam!
>
> We'll start taking a look at your code. What do you think about creating a
> branch in the Iceberg repository where we can work on improving it
> together, before merging it into master?
>
> Also, you mentioned performance comparisons. Do you have any early results
> to share?
>
> rb
>
> On Tue, Jul 23, 2019 at 3:40 PM Gautam  wrote:
>
>> Hello Folks,
>>
>> I have checked in a WIP branch [1] with a working version of Vectorized
>> reads for Iceberg reader. Here's the diff  [2].
>>
>> *Implementation Notes:*
>>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
>> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
>> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
>> each iteration.
>>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
>> copied from [3] . Added by @Daniel Weeks  . Thanks
>> for that!
>>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
>> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
>> the code)
>>  - `VectorizedSparkParquetReaders` contains the visitor implementations
>> to map Parquet types to appropriate value readers. I implemented the struct
>> visitor so that the root schema can be mapped properly. This has the added
>> benefit of vectorization support for structs, so yay!
>>  - For the initial version the value readers read an entire row group
>> into a single Arrow Field Vector. this i'd imagine will require tuning for
>> right batch sizing but i'v gone with one batch per rowgroup for now.
>>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
>> Spark's ColumnVector implementation backed by Arrow. This is the first
>> contact point between Spark and Arrow interfaces.
>>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
>> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
>> which maps Structs to Columnar Batches. This allows us to have nested
>> structs where each level of nesting would be a nested columnar batch. Lemme
>> know what you think of this approach.
>>  - I'v added value readers for all supported primitive types listed in
>> `AvroDataTest`. There's a corresponding test for vectorized reader under
>> `TestSparkParquetVectorizedReader`
>>  - I haven't fixed all the Checkstyle errors so you will have to turn
>> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>>
>> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
>> it's not used. This was from my previous impl of Vectorization. I'v kept it
>> around to compare performance.
>>
>> Lemme know what folks think of the approach. I'm getting this working for
>> our scale test benchmark and will report back with numbers. Feel free to
>> run your own benchmarks and share.
>>
>> Cheers,
>> -Gautam.
>>
>>
>>
>>
>> [1] -
>> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
>> [2] -
>> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
>> [3] -
>> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>>
>>
>> On Mon, Jul 22, 2019 at 2:33 PM Gautam  wrote:
>>
>>> Will do. Doing a bit of housekeeping on the code and also adding more
>>> primitive type support.
>>>
>>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:
>>>
>>>> Would it be possible to put the work in progress code in open source?
>>>>
>>>>
>>>>
>>>> *From: *Gautam 
>>>> *Reply-To: *"dev@iceberg.apache.org" 
>>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>>> *To: *Daniel Weeks 
>>>> *Cc: *Ryan Blue , Iceberg Dev List <
>>>> dev@iceberg.apache.org>
>>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>>
>>>>
>>>>
>>>> That would be great!
>>>>
>>>>
>>>>
>>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks 
>>>> wrote:
>>>>
>>>> H

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-24 Thread Ryan Blue
Thanks Gautam!

We'll start taking a look at your code. What do you think about creating a
branch in the Iceberg repository where we can work on improving it
together, before merging it into master?

Also, you mentioned performance comparisons. Do you have any early results
to share?

rb

On Tue, Jul 23, 2019 at 3:40 PM Gautam  wrote:

> Hello Folks,
>
> I have checked in a WIP branch [1] with a working version of Vectorized
> reads for Iceberg reader. Here's the diff  [2].
>
> *Implementation Notes:*
>  - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
> the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
> usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
> each iteration.
>  - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
> copied from [3] . Added by @Daniel Weeks  . Thanks
> for that!
>  - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
> reading/decoding the Parquet rowgroups (aka pagestores as referred to in
> the code)
>  - `VectorizedSparkParquetReaders` contains the visitor implementations to
> map Parquet types to appropriate value readers. I implemented the struct
> visitor so that the root schema can be mapped properly. This has the added
> benefit of vectorization support for structs, so yay!
>  - For the initial version the value readers read an entire row group into
> a single Arrow Field Vector. this i'd imagine will require tuning for right
> batch sizing but i'v gone with one batch per rowgroup for now.
>  - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
> Spark's ColumnVector implementation backed by Arrow. This is the first
> contact point between Spark and Arrow interfaces.
>  - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
> `ColumnarBatchReader` . This is my replacement for `InternalRowReader`
> which maps Structs to Columnar Batches. This allows us to have nested
> structs where each level of nesting would be a nested columnar batch. Lemme
> know what you think of this approach.
>  - I'v added value readers for all supported primitive types listed in
> `AvroDataTest`. There's a corresponding test for vectorized reader under
> `TestSparkParquetVectorizedReader`
>  - I haven't fixed all the Checkstyle errors so you will have to turn
> checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
>
> *P.S*. There's some unused code under ArrowReader.java. Ignore this as
> it's not used. This was from my previous impl of Vectorization. I'v kept it
> around to compare performance.
>
> Lemme know what folks think of the approach. I'm getting this working for
> our scale test benchmark and will report back with numbers. Feel free to
> run your own benchmarks and share.
>
> Cheers,
> -Gautam.
>
>
>
>
> [1] -
> https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
> [2] -
> https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
> [3] -
> https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
>
>
> On Mon, Jul 22, 2019 at 2:33 PM Gautam  wrote:
>
>> Will do. Doing a bit of housekeeping on the code and also adding more
>> primitive type support.
>>
>> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:
>>
>>> Would it be possible to put the work in progress code in open source?
>>>
>>>
>>>
>>> *From: *Gautam 
>>> *Reply-To: *"dev@iceberg.apache.org" 
>>> *Date: *Monday, July 22, 2019 at 9:46 AM
>>> *To: *Daniel Weeks 
>>> *Cc: *Ryan Blue , Iceberg Dev List <
>>> dev@iceberg.apache.org>
>>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>>
>>>
>>>
>>> That would be great!
>>>
>>>
>>>
>>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks  wrote:
>>>
>>> Hey Gautam,
>>>
>>>
>>>
>>> We also have a couple people looking into vectorized reading (into Arrow
>>> memory).  I think it would be good for us to get together and see if we can
>>> collaborate on a common approach for this.
>>>
>>>
>>>
>>> I'll reach out directly and see if we can get together.
>>>
>>>
>>>
>>> -Dan
>>>
>>>
>>>
>>> On Sun, Jul 21, 2019 at 10:35 PM Gautam  wrote:
>>>
>>> Figured this out. I'm return

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-23 Thread Gautam
Hello Folks,

I have checked in a WIP branch [1] with a working version of Vectorized
reads for Iceberg reader. Here's the diff  [2].

*Implementation Notes:*
 - Iceberg's Reader adds a `SupportsScanColumnarBatch` mixin to instruct
the DataSourceV2ScanExec to use `planBatchPartitions()` instead of the
usual `planInputPartitions()`. It returns instances of `ColumnarBatch` on
each iteration.
 - `ArrowSchemaUtil` contains Iceberg to Arrow type conversion. This was
copied from [3] . Added by @Daniel Weeks  . Thanks for
that!
 - `VectorizedParquetValueReaders` contains ParquetValueReaders used for
reading/decoding the Parquet rowgroups (aka pagestores as referred to in
the code)
 - `VectorizedSparkParquetReaders` contains the visitor implementations to
map Parquet types to appropriate value readers. I implemented the struct
visitor so that the root schema can be mapped properly. This has the added
benefit of vectorization support for structs, so yay!
 - For the initial version the value readers read an entire row group into
a single Arrow Field Vector. this i'd imagine will require tuning for right
batch sizing but i'v gone with one batch per rowgroup for now.
 - Arrow Field Vectors are wrapped using `ArrowColumnVector` which is
Spark's ColumnVector implementation backed by Arrow. This is the first
contact point between Spark and Arrow interfaces.
 - ArrowColumnVectors are stitched together into a `ColumnarBatch` by
`ColumnarBatchReader` . This is my replacement for `InternalRowReader`
which maps Structs to Columnar Batches. This allows us to have nested
structs where each level of nesting would be a nested columnar batch. Lemme
know what you think of this approach.
 - I'v added value readers for all supported primitive types listed in
`AvroDataTest`. There's a corresponding test for vectorized reader under
`TestSparkParquetVectorizedReader`
 - I haven't fixed all the Checkstyle errors so you will have to turn
checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(

*P.S*. There's some unused code under ArrowReader.java. Ignore this as it's
not used. This was from my previous impl of Vectorization. I'v kept it
around to compare performance.

Lemme know what folks think of the approach. I'm getting this working for
our scale test benchmark and will report back with numbers. Feel free to
run your own benchmarks and share.

Cheers,
-Gautam.




[1] -
https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
[2] -
https://github.com/apache/incubator-iceberg/compare/master...prodeezy:issue-9-support-arrow-based-reading-WIP
[3] -
https://github.com/apache/incubator-iceberg/blob/72e3485510e9cbec05dd30e2e7ce5d03071f400d/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java


On Mon, Jul 22, 2019 at 2:33 PM Gautam  wrote:

> Will do. Doing a bit of housekeeping on the code and also adding more
> primitive type support.
>
> On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:
>
>> Would it be possible to put the work in progress code in open source?
>>
>>
>>
>> *From: *Gautam 
>> *Reply-To: *"dev@iceberg.apache.org" 
>> *Date: *Monday, July 22, 2019 at 9:46 AM
>> *To: *Daniel Weeks 
>> *Cc: *Ryan Blue , Iceberg Dev List <
>> dev@iceberg.apache.org>
>> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>>
>>
>>
>> That would be great!
>>
>>
>>
>> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks  wrote:
>>
>> Hey Gautam,
>>
>>
>>
>> We also have a couple people looking into vectorized reading (into Arrow
>> memory).  I think it would be good for us to get together and see if we can
>> collaborate on a common approach for this.
>>
>>
>>
>> I'll reach out directly and see if we can get together.
>>
>>
>>
>> -Dan
>>
>>
>>
>> On Sun, Jul 21, 2019 at 10:35 PM Gautam  wrote:
>>
>> Figured this out. I'm returning ColumnarBatch iterator directly without
>> projection with schema set appropriately in `readSchema() `.. the empty
>> result was due to valuesRead not being set correctly on FileIterator. Did
>> that and things are working. Will circle back with numbers soon.
>>
>>
>>
>> On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:
>>
>> Hey Guys,
>>
>>Sorry bout the delay on this. Just got back on getting a basic
>> working implementation in Iceberg for Vectorization on primitive types.
>>
>>
>>
>> *Here's what I have so far :  *
>>
>>
>>
>> I have added `ParquetValueReader` implementations for some basic
>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-22 Thread Gautam
Will do. Doing a bit of housekeeping on the code and also adding more
primitive type support.

On Mon, Jul 22, 2019 at 1:41 PM Matt Cheah  wrote:

> Would it be possible to put the work in progress code in open source?
>
>
>
> *From: *Gautam 
> *Reply-To: *"dev@iceberg.apache.org" 
> *Date: *Monday, July 22, 2019 at 9:46 AM
> *To: *Daniel Weeks 
> *Cc: *Ryan Blue , Iceberg Dev List <
> dev@iceberg.apache.org>
> *Subject: *Re: Approaching Vectorized Reading in Iceberg ..
>
>
>
> That would be great!
>
>
>
> On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks  wrote:
>
> Hey Gautam,
>
>
>
> We also have a couple people looking into vectorized reading (into Arrow
> memory).  I think it would be good for us to get together and see if we can
> collaborate on a common approach for this.
>
>
>
> I'll reach out directly and see if we can get together.
>
>
>
> -Dan
>
>
>
> On Sun, Jul 21, 2019 at 10:35 PM Gautam  wrote:
>
> Figured this out. I'm returning ColumnarBatch iterator directly without
> projection with schema set appropriately in `readSchema() `.. the empty
> result was due to valuesRead not being set correctly on FileIterator. Did
> that and things are working. Will circle back with numbers soon.
>
>
>
> On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:
>
> Hey Guys,
>
>Sorry bout the delay on this. Just got back on getting a basic
> working implementation in Iceberg for Vectorization on primitive types.
>
>
>
> *Here's what I have so far :  *
>
>
>
> I have added `ParquetValueReader` implementations for some basic primitive
> types that build the respective Arrow Vector (`ValueVector`) viz.
> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
> value vector reader there are column iterators that read from the parquet
> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
> stitched together using a `ColumnarBatchReader` (which as the name suggests
> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
> work properly with the underlying interfaces.  I'v also made changes to
> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
> expects ColumnarBatch instances (instead of InternalRow). The query
> planning runtime works fine with these changes.
>
>
>
> Although it fails during query execution, the bit it's  currently failing
> at is this line of code : 
> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
> [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_incubator-2Diceberg_blob_master_spark_src_main_java_org_apache_iceberg_spark_source_Reader.java-23L414&d=DwMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E99EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=UW1Nb5KZOPeIqsjzFnKhGQaxYHT_wAI_2PvgFUlfAoY&s=7wzoBoRwCjQjgamnHukQSe0wiATMnGbYhfJQpXfSMks&e=>
>
>
>
> This code, I think,  tries to apply the iterator's schema projection on
> the InternalRow instances. This seems to be tightly coupled to InternalRow
> as Spark's catalyst expressions have implemented the UnsafeProjection for
> InternalRow only. If I take this out and just return the
> `Iterator` iterator I built it returns empty result on the
> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
> There's a Todo in the code that says "*remove the projection by reporting
> the iterator's schema back to Spark*".  Is there a simple way to
> communicate that to Spark for my new iterator? Any pointers on how to get
> around this?
>
>
>
>
>
> Thanks and Regards,
>
> -Gautam.
>
>
>
>
>
>
>
>
>
> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:
>
> Replies inline.
>
>
>
> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>
> Thanks for responding Ryan,
>
>
>
> Couple of follow up questions on ParquetValueReader for Arrow..
>
>
>
> I'd like to start with testing Arrow out with readers for primitive type
> and incrementally add in Struct/Array support, also ArrowWriter [1]
> currently doesn't have converters for map type. How can I default these
> types to regular materialization whilst supporting Arrow based support for
> primitives?
>
>
>
> We should look at what Spark does to handle maps.
>
>
>
> I think we should get the prototype working with test ca

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-22 Thread Matt Cheah
Would it be possible to put the work in progress code in open source?

 

From: Gautam 
Reply-To: "dev@iceberg.apache.org" 
Date: Monday, July 22, 2019 at 9:46 AM
To: Daniel Weeks 
Cc: Ryan Blue , Iceberg Dev List 
Subject: Re: Approaching Vectorized Reading in Iceberg ..

 

That would be great!

 

On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks  wrote:

Hey Gautam, 

 

We also have a couple people looking into vectorized reading (into Arrow 
memory).  I think it would be good for us to get together and see if we can 
collaborate on a common approach for this.

 

I'll reach out directly and see if we can get together.

 

-Dan

 

On Sun, Jul 21, 2019 at 10:35 PM Gautam  wrote:

Figured this out. I'm returning ColumnarBatch iterator directly without 
projection with schema set appropriately in `readSchema() `.. the empty result 
was due to valuesRead not being set correctly on FileIterator. Did that and 
things are working. Will circle back with numbers soon. 

 

On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:

Hey Guys, 

   Sorry bout the delay on this. Just got back on getting a basic 
working implementation in Iceberg for Vectorization on primitive types. 

 

Here's what I have so far :  

 

I have added `ParquetValueReader` implementations for some basic primitive 
types that build the respective Arrow Vector (`ValueVector`) viz. `IntVector` 
for int, `VarCharVector` for strings and so on. Underneath each value vector 
reader there are column iterators that read from the parquet pagestores 
(rowgroups) in chunks. These `ValueVector-s` are lined up as 
`ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and 
stitched together using a `ColumnarBatchReader` (which as the name suggests 
wraps ColumnarBatches in the iterator)   I'v verified that these pieces work 
properly with the underlying interfaces.  I'v also made changes to Iceberg's 
`Reader` to  implement `planBatchPartitions()` (to add the 
`SupportsScanColumnarBatch` mixin to the reader).  So the reader now expects 
ColumnarBatch instances (instead of InternalRow). The query planning runtime 
works fine with these changes.

 

Although it fails during query execution, the bit it's  currently failing at is 
this line of code : 
https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
 [github.com]

 

This code, I think,  tries to apply the iterator's schema projection on the 
InternalRow instances. This seems to be tightly coupled to InternalRow as 
Spark's catalyst expressions have implemented the UnsafeProjection for 
InternalRow only. If I take this out and just return the 
`Iterator` iterator I built it returns empty result on the 
client. I'm guessing this is coz Spark is unaware of the iterator's schema? 
There's a Todo in the code that says "remove the projection by reporting the 
iterator's schema back to Spark".  Is there a simple way to communicate that to 
Spark for my new iterator? Any pointers on how to get around this?

 

 

Thanks and Regards,

-Gautam. 

 

 

 

 

On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:

Replies inline.

 

On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:

Thanks for responding Ryan,  

 

Couple of follow up questions on ParquetValueReader for Arrow.. 

 

I'd like to start with testing Arrow out with readers for primitive type and 
incrementally add in Struct/Array support, also ArrowWriter [1] currently 
doesn't have converters for map type. How can I default these types to regular 
materialization whilst supporting Arrow based support for primitives? 

 

We should look at what Spark does to handle maps.

 

I think we should get the prototype working with test cases that don't have 
maps, structs, or lists. Just getting primitives working is a good start and 
just won't hit these problems.

 

Lemme know if this makes sense...

 

- I extend  PrimitiveReader (for Arrow) that loads primitive types into 
ArrowColumnVectors of corresponding column types by iterating over underlying 
ColumnIterator n times, where n is size of batch.

 

Sounds good to me. I'm not sure about extending vs wrapping because I'm not too 
familiar with the Arrow APIs.

 

- Reader.newParquetIterable()  maps primitive column types to the newly added 
ArrowParquetValueReader but for other types (nested types, etc.) uses current 
InternalRow based ValueReaders 

 

Sounds good for primitives, but I would just leave the nested types 
un-implemented for now.

 

- Stitch the columns vectors together to create ColumnarBatch, (Since 
SupportsScanColumnarBatch mixin currently expects this ) .. although I'm a bit 
lost on how the stitching of columns happens currently? .. and how the 
ArrowColumnVectors could  be stitched alongside regular columns that don't have 
arrow based support ?

 

I don't think that 

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-22 Thread Gautam
That would be great!

On Mon, Jul 22, 2019 at 9:12 AM Daniel Weeks  wrote:

> Hey Gautam,
>
> We also have a couple people looking into vectorized reading (into Arrow
> memory).  I think it would be good for us to get together and see if we can
> collaborate on a common approach for this.
>
> I'll reach out directly and see if we can get together.
>
> -Dan
>
> On Sun, Jul 21, 2019 at 10:35 PM Gautam  wrote:
>
>> Figured this out. I'm returning ColumnarBatch iterator directly without
>> projection with schema set appropriately in `readSchema() `.. the empty
>> result was due to valuesRead not being set correctly on FileIterator. Did
>> that and things are working. Will circle back with numbers soon.
>>
>> On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:
>>
>>> Hey Guys,
>>>Sorry bout the delay on this. Just got back on getting a
>>> basic working implementation in Iceberg for Vectorization on primitive
>>> types.
>>>
>>> *Here's what I have so far :  *
>>>
>>> I have added `ParquetValueReader` implementations for some basic
>>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>>> value vector reader there are column iterators that read from the parquet
>>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>>> work properly with the underlying interfaces.  I'v also made changes to
>>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>>> expects ColumnarBatch instances (instead of InternalRow). The query
>>> planning runtime works fine with these changes.
>>>
>>> Although it fails during query execution, the bit it's  currently
>>> failing at is this line of code :
>>> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>>
>>> This code, I think,  tries to apply the iterator's schema projection on
>>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>>> InternalRow only. If I take this out and just return the
>>> `Iterator` iterator I built it returns empty result on the
>>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>>> There's a Todo in the code that says "*remove the projection by
>>> reporting the iterator's schema back to Spark*".  Is there a simple way
>>> to communicate that to Spark for my new iterator? Any pointers on how to
>>> get around this?
>>>
>>>
>>> Thanks and Regards,
>>> -Gautam.
>>>
>>>
>>>
>>>
>>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:
>>>
 Replies inline.

 On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:

> Thanks for responding Ryan,
>
> Couple of follow up questions on ParquetValueReader for Arrow..
>
> I'd like to start with testing Arrow out with readers for primitive
> type and incrementally add in Struct/Array support, also ArrowWriter [1]
> currently doesn't have converters for map type. How can I default these
> types to regular materialization whilst supporting Arrow based support for
> primitives?
>

 We should look at what Spark does to handle maps.

 I think we should get the prototype working with test cases that don't
 have maps, structs, or lists. Just getting primitives working is a good
 start and just won't hit these problems.


> Lemme know if this makes sense...
>
> - I extend  PrimitiveReader (for Arrow) that loads primitive types
> into ArrowColumnVectors of corresponding column types by iterating over
> underlying ColumnIterator *n times*, where n is size of batch.
>

 Sounds good to me. I'm not sure about extending vs wrapping because I'm
 not too familiar with the Arrow APIs.


> - Reader.newParquetIterable()  maps primitive column types to the
> newly added ArrowParquetValueReader but for other types (nested types,
> etc.) uses current *InternalRow* based ValueReaders
>

 Sounds good for primitives, but I would just leave the nested types
 un-implemented for now.


> - Stitch the columns vectors together to create ColumnarBatch, (Since
> *SupportsScanColumnarBatch* mixin currently expects this ) ..
> *although* *I'm a bit lost on how the stitching of columns happens
> currently*? .. and how the ArrowColumnVectors could  be stitched
> alongside regular columns that don't have arrow based support ?
>

 I don't think that you can mix regular columns and Arrow columns. It
 has to be all one o

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-22 Thread Daniel Weeks
Hey Gautam,

We also have a couple people looking into vectorized reading (into Arrow
memory).  I think it would be good for us to get together and see if we can
collaborate on a common approach for this.

I'll reach out directly and see if we can get together.

-Dan

On Sun, Jul 21, 2019 at 10:35 PM Gautam  wrote:

> Figured this out. I'm returning ColumnarBatch iterator directly without
> projection with schema set appropriately in `readSchema() `.. the empty
> result was due to valuesRead not being set correctly on FileIterator. Did
> that and things are working. Will circle back with numbers soon.
>
> On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:
>
>> Hey Guys,
>>Sorry bout the delay on this. Just got back on getting a basic
>> working implementation in Iceberg for Vectorization on primitive types.
>>
>> *Here's what I have so far :  *
>>
>> I have added `ParquetValueReader` implementations for some basic
>> primitive types that build the respective Arrow Vector (`ValueVector`) viz.
>> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
>> value vector reader there are column iterators that read from the parquet
>> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
>> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
>> stitched together using a `ColumnarBatchReader` (which as the name suggests
>> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
>> work properly with the underlying interfaces.  I'v also made changes to
>> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
>> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
>> expects ColumnarBatch instances (instead of InternalRow). The query
>> planning runtime works fine with these changes.
>>
>> Although it fails during query execution, the bit it's  currently failing
>> at is this line of code :
>> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>>
>> This code, I think,  tries to apply the iterator's schema projection on
>> the InternalRow instances. This seems to be tightly coupled to InternalRow
>> as Spark's catalyst expressions have implemented the UnsafeProjection for
>> InternalRow only. If I take this out and just return the
>> `Iterator` iterator I built it returns empty result on the
>> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
>> There's a Todo in the code that says "*remove the projection by
>> reporting the iterator's schema back to Spark*".  Is there a simple way
>> to communicate that to Spark for my new iterator? Any pointers on how to
>> get around this?
>>
>>
>> Thanks and Regards,
>> -Gautam.
>>
>>
>>
>>
>> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:
>>
>>> Replies inline.
>>>
>>> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>>>
 Thanks for responding Ryan,

 Couple of follow up questions on ParquetValueReader for Arrow..

 I'd like to start with testing Arrow out with readers for primitive
 type and incrementally add in Struct/Array support, also ArrowWriter [1]
 currently doesn't have converters for map type. How can I default these
 types to regular materialization whilst supporting Arrow based support for
 primitives?

>>>
>>> We should look at what Spark does to handle maps.
>>>
>>> I think we should get the prototype working with test cases that don't
>>> have maps, structs, or lists. Just getting primitives working is a good
>>> start and just won't hit these problems.
>>>
>>>
 Lemme know if this makes sense...

 - I extend  PrimitiveReader (for Arrow) that loads primitive types into
 ArrowColumnVectors of corresponding column types by iterating over
 underlying ColumnIterator *n times*, where n is size of batch.

>>>
>>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>>> not too familiar with the Arrow APIs.
>>>
>>>
 - Reader.newParquetIterable()  maps primitive column types to the newly
 added ArrowParquetValueReader but for other types (nested types, etc.) uses
 current *InternalRow* based ValueReaders

>>>
>>> Sounds good for primitives, but I would just leave the nested types
>>> un-implemented for now.
>>>
>>>
 - Stitch the columns vectors together to create ColumnarBatch, (Since
 *SupportsScanColumnarBatch* mixin currently expects this ) ..
 *although* *I'm a bit lost on how the stitching of columns happens
 currently*? .. and how the ArrowColumnVectors could  be stitched
 alongside regular columns that don't have arrow based support ?

>>>
>>> I don't think that you can mix regular columns and Arrow columns. It has
>>> to be all one or the other. That's why it's easier to start with
>>> primitives, then add structs, then lists, and finally maps.
>>>
>>>
 - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
 t

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-21 Thread Gautam
Figured this out. I'm returning ColumnarBatch iterator directly without
projection with schema set appropriately in `readSchema() `.. the empty
result was due to valuesRead not being set correctly on FileIterator. Did
that and things are working. Will circle back with numbers soon.

On Fri, Jul 19, 2019 at 5:22 PM Gautam  wrote:

> Hey Guys,
>Sorry bout the delay on this. Just got back on getting a basic
> working implementation in Iceberg for Vectorization on primitive types.
>
> *Here's what I have so far :  *
>
> I have added `ParquetValueReader` implementations for some basic primitive
> types that build the respective Arrow Vector (`ValueVector`) viz.
> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
> value vector reader there are column iterators that read from the parquet
> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
> stitched together using a `ColumnarBatchReader` (which as the name suggests
> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
> work properly with the underlying interfaces.  I'v also made changes to
> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
> expects ColumnarBatch instances (instead of InternalRow). The query
> planning runtime works fine with these changes.
>
> Although it fails during query execution, the bit it's  currently failing
> at is this line of code :
> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>
> This code, I think,  tries to apply the iterator's schema projection on
> the InternalRow instances. This seems to be tightly coupled to InternalRow
> as Spark's catalyst expressions have implemented the UnsafeProjection for
> InternalRow only. If I take this out and just return the
> `Iterator` iterator I built it returns empty result on the
> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
> There's a Todo in the code that says "*remove the projection by reporting
> the iterator's schema back to Spark*".  Is there a simple way to
> communicate that to Spark for my new iterator? Any pointers on how to get
> around this?
>
>
> Thanks and Regards,
> -Gautam.
>
>
>
>
> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:
>
>> Replies inline.
>>
>> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>>
>>> Thanks for responding Ryan,
>>>
>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>
>>> I'd like to start with testing Arrow out with readers for primitive type
>>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>>> currently doesn't have converters for map type. How can I default these
>>> types to regular materialization whilst supporting Arrow based support for
>>> primitives?
>>>
>>
>> We should look at what Spark does to handle maps.
>>
>> I think we should get the prototype working with test cases that don't
>> have maps, structs, or lists. Just getting primitives working is a good
>> start and just won't hit these problems.
>>
>>
>>> Lemme know if this makes sense...
>>>
>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>> ArrowColumnVectors of corresponding column types by iterating over
>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>
>>
>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>> not too familiar with the Arrow APIs.
>>
>>
>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>> current *InternalRow* based ValueReaders
>>>
>>
>> Sounds good for primitives, but I would just leave the nested types
>> un-implemented for now.
>>
>>
>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although*
>>>  *I'm a bit lost on how the stitching of columns happens currently*? ..
>>> and how the ArrowColumnVectors could  be stitched alongside regular columns
>>> that don't have arrow based support ?
>>>
>>
>> I don't think that you can mix regular columns and Arrow columns. It has
>> to be all one or the other. That's why it's easier to start with
>> primitives, then add structs, then lists, and finally maps.
>>
>>
>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>
>>
>> We will probably need two paths. One for columnar batches and one for
>> row-based reads. That doesn't need to be done right away and what you
>> already have in your working copy makes sense as a start.
>>
>>
>>> That's a lot of questions! :-) but hope i'm making sense.
>>>
>>> -Gautam.
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/apache/spark/blob/master/sq

Re: Approaching Vectorized Reading in Iceberg ..

2019-07-19 Thread Gautam
Hey Guys,
   Sorry bout the delay on this. Just got back on getting a basic
working implementation in Iceberg for Vectorization on primitive types.

*Here's what I have so far :  *

I have added `ParquetValueReader` implementations for some basic primitive
types that build the respective Arrow Vector (`ValueVector`) viz.
`IntVector` for int, `VarCharVector` for strings and so on. Underneath each
value vector reader there are column iterators that read from the parquet
pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
`ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
stitched together using a `ColumnarBatchReader` (which as the name suggests
wraps ColumnarBatches in the iterator)   I'v verified that these pieces
work properly with the underlying interfaces.  I'v also made changes to
Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
`SupportsScanColumnarBatch` mixin to the reader).  So the reader now
expects ColumnarBatch instances (instead of InternalRow). The query
planning runtime works fine with these changes.

Although it fails during query execution, the bit it's  currently failing
at is this line of code :
https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414

This code, I think,  tries to apply the iterator's schema projection on the
InternalRow instances. This seems to be tightly coupled to InternalRow as
Spark's catalyst expressions have implemented the UnsafeProjection for
InternalRow only. If I take this out and just return the
`Iterator` iterator I built it returns empty result on the
client. I'm guessing this is coz Spark is unaware of the iterator's schema?
There's a Todo in the code that says "*remove the projection by reporting
the iterator's schema back to Spark*".  Is there a simple way to
communicate that to Spark for my new iterator? Any pointers on how to get
around this?


Thanks and Regards,
-Gautam.




On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:

> Replies inline.
>
> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>
>> Thanks for responding Ryan,
>>
>> Couple of follow up questions on ParquetValueReader for Arrow..
>>
>> I'd like to start with testing Arrow out with readers for primitive type
>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>> currently doesn't have converters for map type. How can I default these
>> types to regular materialization whilst supporting Arrow based support for
>> primitives?
>>
>
> We should look at what Spark does to handle maps.
>
> I think we should get the prototype working with test cases that don't
> have maps, structs, or lists. Just getting primitives working is a good
> start and just won't hit these problems.
>
>
>> Lemme know if this makes sense...
>>
>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>> ArrowColumnVectors of corresponding column types by iterating over
>> underlying ColumnIterator *n times*, where n is size of batch.
>>
>
> Sounds good to me. I'm not sure about extending vs wrapping because I'm
> not too familiar with the Arrow APIs.
>
>
>> - Reader.newParquetIterable()  maps primitive column types to the newly
>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>> current *InternalRow* based ValueReaders
>>
>
> Sounds good for primitives, but I would just leave the nested types
> un-implemented for now.
>
>
>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
>> a bit lost on how the stitching of columns happens currently*? .. and
>> how the ArrowColumnVectors could  be stitched alongside regular columns
>> that don't have arrow based support ?
>>
>
> I don't think that you can mix regular columns and Arrow columns. It has
> to be all one or the other. That's why it's easier to start with
> primitives, then add structs, then lists, and finally maps.
>
>
>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
>> DataSourceV2ScanExec starts using ColumnarBatch scans
>>
>
> We will probably need two paths. One for columnar batches and one for
> row-based reads. That doesn't need to be done right away and what you
> already have in your working copy makes sense as a start.
>
>
>> That's a lot of questions! :-) but hope i'm making sense.
>>
>> -Gautam.
>>
>>
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Approaching Vectorized Reading in Iceberg ..

2019-06-21 Thread Ryan Blue
The vectorized reader in Spark is only used if the schema is flat.

On Fri, Jun 14, 2019 at 5:45 PM Gautam  wrote:

>
> Agree with the approach of getting this working for primitive types only.
> I'l work on a prototype assuming just primitive types for now.
>
> I don't think that you can mix regular columns and Arrow columns. It has
>> to be all one or the other.
>
>
> I was jsut curious about this coz Vanilla Spark reader (with
> vectorization) doesn't support batching on nested fields today but it's
> still able to do vectorization on data with nested/non-nested. This is not
> needed for my poc but would be good to know so if we can leverage this for
> our implementation. Either ways, i'l get to it when this step is done.
>
>
> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:
>
>> Replies inline.
>>
>> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>>
>>> Thanks for responding Ryan,
>>>
>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>
>>> I'd like to start with testing Arrow out with readers for primitive type
>>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>>> currently doesn't have converters for map type. How can I default these
>>> types to regular materialization whilst supporting Arrow based support for
>>> primitives?
>>>
>>
>> We should look at what Spark does to handle maps.
>>
>> I think we should get the prototype working with test cases that don't
>> have maps, structs, or lists. Just getting primitives working is a good
>> start and just won't hit these problems.
>>
>>
>>> Lemme know if this makes sense...
>>>
>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>> ArrowColumnVectors of corresponding column types by iterating over
>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>
>>
>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>> not too familiar with the Arrow APIs.
>>
>>
>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>> current *InternalRow* based ValueReaders
>>>
>>
>> Sounds good for primitives, but I would just leave the nested types
>> un-implemented for now.
>>
>>
>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although*
>>>  *I'm a bit lost on how the stitching of columns happens currently*? ..
>>> and how the ArrowColumnVectors could  be stitched alongside regular columns
>>> that don't have arrow based support ?
>>>
>>
>> I don't think that you can mix regular columns and Arrow columns. It has
>> to be all one or the other. That's why it's easier to start with
>> primitives, then add structs, then lists, and finally maps.
>>
>>
>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>
>>
>> We will probably need two paths. One for columnar batches and one for
>> row-based reads. That doesn't need to be done right away and what you
>> already have in your working copy makes sense as a start.
>>
>>
>>> That's a lot of questions! :-) but hope i'm making sense.
>>>
>>> -Gautam.
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Approaching Vectorized Reading in Iceberg ..

2019-06-14 Thread Gautam
Agree with the approach of getting this working for primitive types only.
I'l work on a prototype assuming just primitive types for now.

I don't think that you can mix regular columns and Arrow columns. It has to
> be all one or the other.


I was jsut curious about this coz Vanilla Spark reader (with vectorization)
doesn't support batching on nested fields today but it's still able to do
vectorization on data with nested/non-nested. This is not needed for my poc
but would be good to know so if we can leverage this for our
implementation. Either ways, i'l get to it when this step is done.


On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue  wrote:

> Replies inline.
>
> On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:
>
>> Thanks for responding Ryan,
>>
>> Couple of follow up questions on ParquetValueReader for Arrow..
>>
>> I'd like to start with testing Arrow out with readers for primitive type
>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>> currently doesn't have converters for map type. How can I default these
>> types to regular materialization whilst supporting Arrow based support for
>> primitives?
>>
>
> We should look at what Spark does to handle maps.
>
> I think we should get the prototype working with test cases that don't
> have maps, structs, or lists. Just getting primitives working is a good
> start and just won't hit these problems.
>
>
>> Lemme know if this makes sense...
>>
>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>> ArrowColumnVectors of corresponding column types by iterating over
>> underlying ColumnIterator *n times*, where n is size of batch.
>>
>
> Sounds good to me. I'm not sure about extending vs wrapping because I'm
> not too familiar with the Arrow APIs.
>
>
>> - Reader.newParquetIterable()  maps primitive column types to the newly
>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>> current *InternalRow* based ValueReaders
>>
>
> Sounds good for primitives, but I would just leave the nested types
> un-implemented for now.
>
>
>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
>> a bit lost on how the stitching of columns happens currently*? .. and
>> how the ArrowColumnVectors could  be stitched alongside regular columns
>> that don't have arrow based support ?
>>
>
> I don't think that you can mix regular columns and Arrow columns. It has
> to be all one or the other. That's why it's easier to start with
> primitives, then add structs, then lists, and finally maps.
>
>
>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
>> DataSourceV2ScanExec starts using ColumnarBatch scans
>>
>
> We will probably need two paths. One for columnar batches and one for
> row-based reads. That doesn't need to be done right away and what you
> already have in your working copy makes sense as a start.
>
>
>> That's a lot of questions! :-) but hope i'm making sense.
>>
>> -Gautam.
>>
>>
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Approaching Vectorized Reading in Iceberg ..

2019-06-14 Thread Ryan Blue
Replies inline.

On Fri, Jun 14, 2019 at 1:11 AM Gautam  wrote:

> Thanks for responding Ryan,
>
> Couple of follow up questions on ParquetValueReader for Arrow..
>
> I'd like to start with testing Arrow out with readers for primitive type
> and incrementally add in Struct/Array support, also ArrowWriter [1]
> currently doesn't have converters for map type. How can I default these
> types to regular materialization whilst supporting Arrow based support for
> primitives?
>

We should look at what Spark does to handle maps.

I think we should get the prototype working with test cases that don't have
maps, structs, or lists. Just getting primitives working is a good start
and just won't hit these problems.


> Lemme know if this makes sense...
>
> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
> ArrowColumnVectors of corresponding column types by iterating over
> underlying ColumnIterator *n times*, where n is size of batch.
>

Sounds good to me. I'm not sure about extending vs wrapping because I'm not
too familiar with the Arrow APIs.


> - Reader.newParquetIterable()  maps primitive column types to the newly
> added ArrowParquetValueReader but for other types (nested types, etc.) uses
> current *InternalRow* based ValueReaders
>

Sounds good for primitives, but I would just leave the nested types
un-implemented for now.


> - Stitch the columns vectors together to create ColumnarBatch, (Since
> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
> a bit lost on how the stitching of columns happens currently*? .. and how
> the ArrowColumnVectors could  be stitched alongside regular columns that
> don't have arrow based support ?
>

I don't think that you can mix regular columns and Arrow columns. It has to
be all one or the other. That's why it's easier to start with primitives,
then add structs, then lists, and finally maps.


> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
> DataSourceV2ScanExec starts using ColumnarBatch scans
>

We will probably need two paths. One for columnar batches and one for
row-based reads. That doesn't need to be done right away and what you
already have in your working copy makes sense as a start.


> That's a lot of questions! :-) but hope i'm making sense.
>
> -Gautam.
>
>
>
> [1] -
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Approaching Vectorized Reading in Iceberg ..

2019-06-14 Thread Gautam
Thanks for responding Ryan,

Couple of follow up questions on ParquetValueReader for Arrow..

I'd like to start with testing Arrow out with readers for primitive type
and incrementally add in Struct/Array support, also ArrowWriter [1]
currently doesn't have converters for map type. How can I default these
types to regular materialization whilst supporting Arrow based support for
primitives?

Lemme know if this makes sense...

- I extend  PrimitiveReader (for Arrow) that loads primitive types into
ArrowColumnVectors of corresponding column types by iterating over
underlying ColumnIterator *n times*, where n is size of batch.
- Reader.newParquetIterable()  maps primitive column types to the newly
added ArrowParquetValueReader but for other types (nested types, etc.) uses
current *InternalRow* based ValueReaders
- Stitch the columns vectors together to create ColumnarBatch, (Since
*SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
a bit lost on how the stitching of columns happens currently*? .. and how
the ArrowColumnVectors could  be stitched alongside regular columns that
don't have arrow based support ?
- Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
DataSourceV2ScanExec starts using ColumnarBatch scans


That's a lot of questions! :-) but hope i'm making sense.

-Gautam.



[1] -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala

On Thu, Jun 13, 2019 at 9:39 AM Ryan Blue  wrote:

> Sounds like a good start. I think the next step is to avoid using the
> ParquetReader.FileIterator and deserialize directly from TripleIterator
> .
> I think the reason why this is taking longer is that (I think) you’re doing
> all the work to materialize the data in rows, and then converting to
> vectors.
>
> To work on top of TripleIterator, I think you need to create a
> ParquetValueReader for Arrow batches. That would be configured with a
> batch size, so that when you pass it into ParquetReader, the FileIterator
> returns batches instead of individual rows.
>
> Does that make sense?
>
> rb
>
> On Wed, Jun 12, 2019 at 11:22 PM Gautam  wrote:
>
>> Hey Ryan and Anton,
>>
>> I wanted to circle back on some findings I had after taking a first stab
>> at this ..
>>
>>
>>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s
>>> what we want to take advantage of..
>>
>>
>> This is how I went about this... I wrapped the
>> *ParquetReader.FileIterator* with an iterator that creates Arrow batch
>> from rows (one batch for every 100 rows) returned by *FileIterator.next()
>> *. I exposed each Arrow Batch from this iterator as a ColumnarBatch
>> which has a * .rowIterator() *that reads this as a sequence of
>> InternalRow. I return this in the  *Reader.open() *call in Iceberg.
>>
>> Here are some microbenchmark numbers on on flat parquet file scanning ...
>>
>> 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10
>> files
>>
>> *BenchmarkMode  Cnt   Score(sec)   Error Units*
>> readFileV1SourceVectorized   avgt3   0.870± 1.883   s/op
>> readIcebergNoBatchingavgt3   1.854± 2.003   s/op
>> readIcebergWithBatching100k  avgt3   3.216± 0.520   s/op
>> readIcebergWithBatching10k   avgt3   8.763± 2.338   s/op
>> readIcebergWithBatching5kavgt3  13.964± 6.328   s/op
>>
>>
>> The Batching doesn't seem to add any benefit. I measured the conversion
>> times and am reading this as the overhead from extra copies to Arrow and
>> then to ColumnarBatch again. Although I was hoping that the materialization
>> to arrow would offset some of that overhead.
>>
>> Wondering what my next step should be..
>> 1) Eliminate the extra conversion IO overhead by reading each column type
>> directly into ArrowColumnVector?
>> 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch
>> mixin and expose the ColumnarBatch?
>>
>>
>> Appreciate your guidance,
>>
>> -Gautam.
>>
>>
>>
>> On Fri, May 24, 2019 at 5:28 PM Ryan Blue 
>> wrote:
>>
>>> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>>> Iterator[InternalRow] interface, it would still not work right? Coz it
>>> seems to me there is a lot more going on upstream in the operator execution
>>> path that would be needed to be done here.
>>>
>>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
>>> we want to take advantage of. You’re right that the first thing that Spark
>>> does it to get each row as InternalRow. But we still get a benefit from
>>> vectorizing the data materialization to Arrow itself. Spark execution is
>>> not vectori

Re: Approaching Vectorized Reading in Iceberg ..

2019-06-14 Thread Gautam
Hey Anton,
  Here's the code
https://github.com/prodeezy/incubator-iceberg/pull/2/files .. Mind you,
it's just a proof of concept to get something going so please ignore the
code design (or lack thereof :-) ).  I'v attached the flat data benchmark
code as well.

Lemme know what you think.







On Thu, Jun 13, 2019 at 10:56 PM Anton Okolnychyi 
wrote:

> Gautam, could you also share the code for benchmarks and conversion?
>
> Thanks,
> Anton
>
> On 13 Jun 2019, at 19:38, Ryan Blue  wrote:
>
> Sounds like a good start. I think the next step is to avoid using the
> ParquetReader.FileIterator and deserialize directly from TripleIterator
> .
> I think the reason why this is taking longer is that (I think) you’re doing
> all the work to materialize the data in rows, and then converting to
> vectors.
>
> To work on top of TripleIterator, I think you need to create a
> ParquetValueReader for Arrow batches. That would be configured with a
> batch size, so that when you pass it into ParquetReader, the FileIterator
> returns batches instead of individual rows.
>
> Does that make sense?
>
> rb
>
> On Wed, Jun 12, 2019 at 11:22 PM Gautam  wrote:
>
>> Hey Ryan and Anton,
>>
>> I wanted to circle back on some findings I had after taking a first stab
>> at this ..
>>
>>
>>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s
>>> what we want to take advantage of..
>>
>>
>> This is how I went about this... I wrapped the
>> *ParquetReader.FileIterator* with an iterator that creates Arrow batch
>> from rows (one batch for every 100 rows) returned by *FileIterator.next()
>> *. I exposed each Arrow Batch from this iterator as a ColumnarBatch
>> which has a * .rowIterator() *that reads this as a sequence of
>> InternalRow. I return this in the  *Reader.open() *call in Iceberg.
>>
>> Here are some microbenchmark numbers on on flat parquet file scanning ...
>>
>> 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10
>> files
>>
>> *BenchmarkMode  Cnt   Score(sec)   Error Units*
>> readFileV1SourceVectorized   avgt3   0.870± 1.883   s/op
>> readIcebergNoBatchingavgt3   1.854± 2.003   s/op
>> readIcebergWithBatching100k  avgt3   3.216± 0.520   s/op
>> readIcebergWithBatching10k   avgt3   8.763± 2.338   s/op
>> readIcebergWithBatching5kavgt3  13.964± 6.328   s/op
>>
>>
>> The Batching doesn't seem to add any benefit. I measured the conversion
>> times and am reading this as the overhead from extra copies to Arrow and
>> then to ColumnarBatch again. Although I was hoping that the materialization
>> to arrow would offset some of that overhead.
>>
>> Wondering what my next step should be..
>> 1) Eliminate the extra conversion IO overhead by reading each column type
>> directly into ArrowColumnVector?
>> 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch
>> mixin and expose the ColumnarBatch?
>>
>>
>> Appreciate your guidance,
>>
>> -Gautam.
>>
>>
>>
>> On Fri, May 24, 2019 at 5:28 PM Ryan Blue 
>> wrote:
>>
>>> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>>> Iterator[InternalRow] interface, it would still not work right? Coz it
>>> seems to me there is a lot more going on upstream in the operator execution
>>> path that would be needed to be done here.
>>>
>>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
>>> we want to take advantage of. You’re right that the first thing that Spark
>>> does it to get each row as InternalRow. But we still get a benefit from
>>> vectorizing the data materialization to Arrow itself. Spark execution is
>>> not vectorized, but that can be updated in Spark later (I think there’s a
>>> proposal).
>>>
>>> I wouldn’t pay too much attention to the Parquet vectorized path in
>>> Spark because it produces its own in-memory format and not Arrow. We want
>>> to take advantage of Arrow so that we can use dictionary-encoded columns in
>>> record batches. Spark’s vectorized Parquet reader also works directly on
>>> Parquet pages instead of a higher-level abstraction. I’m not sure we are
>>> going to want to do that right away instead of using the TripleIterator
>>> that Iceberg currently uses to abstract the Parquet page structure away.
>>>
>>> I would start by building converters that can build a column of Arrow
>>> data from a TripleIterator. Then we can stitch those columns together to
>>> get record batches and see how that performs. Then we can add complexity
>>> from there.
>>>
>>> On Fri, May 24, 2019 at 4:28 PM Gautam  wrote:
>>>
 Hello devs,
As a follow up to
 https://github.com/apache/incubator-iceberg/iss

Re: Approaching Vectorized Reading in Iceberg ..

2019-06-13 Thread Anton Okolnychyi
Gautam, could you also share the code for benchmarks and conversion?

Thanks,
Anton

> On 13 Jun 2019, at 19:38, Ryan Blue  wrote:
> 
> Sounds like a good start. I think the next step is to avoid using the 
> ParquetReader.FileIterator and deserialize directly from TripleIterator 
> .
>  I think the reason why this is taking longer is that (I think) you’re doing 
> all the work to materialize the data in rows, and then converting to vectors.
> 
> To work on top of TripleIterator, I think you need to create a 
> ParquetValueReader for Arrow batches. That would be configured with a batch 
> size, so that when you pass it into ParquetReader, the FileIterator returns 
> batches instead of individual rows.
> 
> Does that make sense?
> 
> rb
> 
> 
> On Wed, Jun 12, 2019 at 11:22 PM Gautam  > wrote:
> Hey Ryan and Anton,
> 
> I wanted to circle back on some findings I had after taking a first stab at 
> this ..
>  
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an 
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what we 
> want to take advantage of..
> 
> This is how I went about this... I wrapped the ParquetReader.FileIterator 
> with an iterator that creates Arrow batch from rows (one batch for every 100 
> rows) returned by FileIterator.next() . I exposed each Arrow Batch from this 
> iterator as a ColumnarBatch which has a  .rowIterator() that reads this as a 
> sequence of InternalRow. I return this in the  Reader.open() call in Iceberg. 
> 
> Here are some microbenchmark numbers on on flat parquet file scanning ...
> 
> 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10 files
> 
> BenchmarkMode  Cnt   Score(sec)   Error Units
> readFileV1SourceVectorized   avgt3   0.870± 1.883   s/op
> readIcebergNoBatchingavgt3   1.854± 2.003   s/op
> readIcebergWithBatching100k  avgt3   3.216± 0.520   s/op
> readIcebergWithBatching10k   avgt3   8.763± 2.338   s/op
> readIcebergWithBatching5kavgt3  13.964± 6.328   s/op
> 
> 
> The Batching doesn't seem to add any benefit. I measured the conversion times 
> and am reading this as the overhead from extra copies to Arrow and then to 
> ColumnarBatch again. Although I was hoping that the materialization to arrow 
> would offset some of that overhead.
> 
> Wondering what my next step should be.. 
> 1) Eliminate the extra conversion IO overhead by reading each column type 
> directly into ArrowColumnVector? 
> 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch 
> mixin and expose the ColumnarBatch?
> 
> 
> Appreciate your guidance,
> 
> -Gautam.
> 
> 
> 
> On Fri, May 24, 2019 at 5:28 PM Ryan Blue  wrote:
> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an 
> Iterator[InternalRow] interface, it would still not work right? Coz it seems 
> to me there is a lot more going on upstream in the operator execution path 
> that would be needed to be done here.
> 
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an 
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what we 
> want to take advantage of. You’re right that the first thing that Spark does 
> it to get each row as InternalRow. But we still get a benefit from 
> vectorizing the data materialization to Arrow itself. Spark execution is not 
> vectorized, but that can be updated in Spark later (I think there’s a 
> proposal).
> 
> I wouldn’t pay too much attention to the Parquet vectorized path in Spark 
> because it produces its own in-memory format and not Arrow. We want to take 
> advantage of Arrow so that we can use dictionary-encoded columns in record 
> batches. Spark’s vectorized Parquet reader also works directly on Parquet 
> pages instead of a higher-level abstraction. I’m not sure we are going to 
> want to do that right away instead of using the TripleIterator that Iceberg 
> currently uses to abstract the Parquet page structure away.
> 
> I would start by building converters that can build a column of Arrow data 
> from a TripleIterator. Then we can stitch those columns together to get 
> record batches and see how that performs. Then we can add complexity from 
> there.
> 
> 
> On Fri, May 24, 2019 at 4:28 PM Gautam  > wrote:
> Hello devs,
>As a follow up to https://github.com/apache/incubator-iceberg/issues/9 
>  I'v been reading 
> through how Spark does vectorized reading in it's current implementation 
> which is in DataSource V1 path. Trying to see how we can achieve the same 
> impact in Iceberg's reading. To start with I want to form an understanding at 
> a high level of the approach one would need to take to achieve th

Re: Approaching Vectorized Reading in Iceberg ..

2019-06-13 Thread Ryan Blue
Sounds like a good start. I think the next step is to avoid using the
ParquetReader.FileIterator and deserialize directly from TripleIterator
.
I think the reason why this is taking longer is that (I think) you’re doing
all the work to materialize the data in rows, and then converting to
vectors.

To work on top of TripleIterator, I think you need to create a
ParquetValueReader for Arrow batches. That would be configured with a batch
size, so that when you pass it into ParquetReader, the FileIterator returns
batches instead of individual rows.

Does that make sense?

rb

On Wed, Jun 12, 2019 at 11:22 PM Gautam  wrote:

> Hey Ryan and Anton,
>
> I wanted to circle back on some findings I had after taking a first stab
> at this ..
>
>
>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s
>> what we want to take advantage of..
>
>
> This is how I went about this... I wrapped the
> *ParquetReader.FileIterator* with an iterator that creates Arrow batch
> from rows (one batch for every 100 rows) returned by *FileIterator.next()
> *. I exposed each Arrow Batch from this iterator as a ColumnarBatch which
> has a * .rowIterator() *that reads this as a sequence of InternalRow. I
> return this in the  *Reader.open() *call in Iceberg.
>
> Here are some microbenchmark numbers on on flat parquet file scanning ...
>
> 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10
> files
>
> *BenchmarkMode  Cnt   Score(sec)   Error Units*
> readFileV1SourceVectorized   avgt3   0.870± 1.883   s/op
> readIcebergNoBatchingavgt3   1.854± 2.003   s/op
> readIcebergWithBatching100k  avgt3   3.216± 0.520   s/op
> readIcebergWithBatching10k   avgt3   8.763± 2.338   s/op
> readIcebergWithBatching5kavgt3  13.964± 6.328   s/op
>
>
> The Batching doesn't seem to add any benefit. I measured the conversion
> times and am reading this as the overhead from extra copies to Arrow and
> then to ColumnarBatch again. Although I was hoping that the materialization
> to arrow would offset some of that overhead.
>
> Wondering what my next step should be..
> 1) Eliminate the extra conversion IO overhead by reading each column type
> directly into ArrowColumnVector?
> 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch
> mixin and expose the ColumnarBatch?
>
>
> Appreciate your guidance,
>
> -Gautam.
>
>
>
> On Fri, May 24, 2019 at 5:28 PM Ryan Blue 
> wrote:
>
>> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>> Iterator[InternalRow] interface, it would still not work right? Coz it
>> seems to me there is a lot more going on upstream in the operator execution
>> path that would be needed to be done here.
>>
>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
>> we want to take advantage of. You’re right that the first thing that Spark
>> does it to get each row as InternalRow. But we still get a benefit from
>> vectorizing the data materialization to Arrow itself. Spark execution is
>> not vectorized, but that can be updated in Spark later (I think there’s a
>> proposal).
>>
>> I wouldn’t pay too much attention to the Parquet vectorized path in Spark
>> because it produces its own in-memory format and not Arrow. We want to take
>> advantage of Arrow so that we can use dictionary-encoded columns in record
>> batches. Spark’s vectorized Parquet reader also works directly on Parquet
>> pages instead of a higher-level abstraction. I’m not sure we are going to
>> want to do that right away instead of using the TripleIterator that Iceberg
>> currently uses to abstract the Parquet page structure away.
>>
>> I would start by building converters that can build a column of Arrow
>> data from a TripleIterator. Then we can stitch those columns together to
>> get record batches and see how that performs. Then we can add complexity
>> from there.
>>
>> On Fri, May 24, 2019 at 4:28 PM Gautam  wrote:
>>
>>> Hello devs,
>>>As a follow up to
>>> https://github.com/apache/incubator-iceberg/issues/9 I'v been reading
>>> through how Spark does vectorized reading in it's current implementation
>>> which is in DataSource V1 path. Trying to see how we can achieve the same
>>> impact in Iceberg's reading. To start with I want to form an understanding
>>> at a high level of the approach one would need to take to achieve this.
>>> Pardon my ignorance as I'm equally new to Spark codebase as I am to
>>> Iceberg. Please correct me if my understanding is wrong.
>>>
>>> So here's what Vectorization seems to be doing for Parquet reading:
>>> - The DataSource scan execution uses ParquetFileFormat to build a
>>> Rec

Re: Approaching Vectorized Reading in Iceberg ..

2019-06-12 Thread Gautam
Hey Ryan and Anton,

I wanted to circle back on some findings I had after taking a first stab at
this ..


> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s
> what we want to take advantage of..


This is how I went about this... I wrapped the *ParquetReader.FileIterator*
with an iterator that creates Arrow batch from rows (one batch for every
100 rows) returned by *FileIterator.next() *. I exposed each Arrow Batch
from this iterator as a ColumnarBatch which has a * .rowIterator() *that
reads this as a sequence of InternalRow. I return this in the  *Reader.open()
*call in Iceberg.

Here are some microbenchmark numbers on on flat parquet file scanning ...

1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10 files

*BenchmarkMode  Cnt   Score(sec)   Error Units*
readFileV1SourceVectorized   avgt3   0.870± 1.883   s/op
readIcebergNoBatchingavgt3   1.854± 2.003   s/op
readIcebergWithBatching100k  avgt3   3.216± 0.520   s/op
readIcebergWithBatching10k   avgt3   8.763± 2.338   s/op
readIcebergWithBatching5kavgt3  13.964± 6.328   s/op


The Batching doesn't seem to add any benefit. I measured the conversion
times and am reading this as the overhead from extra copies to Arrow and
then to ColumnarBatch again. Although I was hoping that the materialization
to arrow would offset some of that overhead.

Wondering what my next step should be..
1) Eliminate the extra conversion IO overhead by reading each column type
directly into ArrowColumnVector?
2) Should I extend IcebergSource to support the SupportsScanColumnarBatch
mixin and expose the ColumnarBatch?


Appreciate your guidance,

-Gautam.



On Fri, May 24, 2019 at 5:28 PM Ryan Blue  wrote:

> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
> Iterator[InternalRow] interface, it would still not work right? Coz it
> seems to me there is a lot more going on upstream in the operator execution
> path that would be needed to be done here.
>
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
> we want to take advantage of. You’re right that the first thing that Spark
> does it to get each row as InternalRow. But we still get a benefit from
> vectorizing the data materialization to Arrow itself. Spark execution is
> not vectorized, but that can be updated in Spark later (I think there’s a
> proposal).
>
> I wouldn’t pay too much attention to the Parquet vectorized path in Spark
> because it produces its own in-memory format and not Arrow. We want to take
> advantage of Arrow so that we can use dictionary-encoded columns in record
> batches. Spark’s vectorized Parquet reader also works directly on Parquet
> pages instead of a higher-level abstraction. I’m not sure we are going to
> want to do that right away instead of using the TripleIterator that Iceberg
> currently uses to abstract the Parquet page structure away.
>
> I would start by building converters that can build a column of Arrow data
> from a TripleIterator. Then we can stitch those columns together to get
> record batches and see how that performs. Then we can add complexity from
> there.
>
> On Fri, May 24, 2019 at 4:28 PM Gautam  wrote:
>
>> Hello devs,
>>As a follow up to
>> https://github.com/apache/incubator-iceberg/issues/9 I'v been reading
>> through how Spark does vectorized reading in it's current implementation
>> which is in DataSource V1 path. Trying to see how we can achieve the same
>> impact in Iceberg's reading. To start with I want to form an understanding
>> at a high level of the approach one would need to take to achieve this.
>> Pardon my ignorance as I'm equally new to Spark codebase as I am to
>> Iceberg. Please correct me if my understanding is wrong.
>>
>> So here's what Vectorization seems to be doing for Parquet reading:
>> - The DataSource scan execution uses ParquetFileFormat to build a
>> RecordReaderIterator [1] which underneath uses the
>> VectorizedParquetReaderReader.
>> - This record reader is used to iterate over entire batches of columns
>> (ColumnarBatch). The iterator.next() call returns a batch and not just a
>> row. The interfaces are such that allow an ColumnarBatch to be passed
>> around as a generic Object. As stated here [2]
>> - On the scan execution side, there is stage Code Generation that
>> compiles code that consumes entire batches at time so that physical
>> operators take advantage of the vectorization feature. So the scanner code
>> is aware that it's reading columnar batches out of the iterator.
>>
>>
>> I'm wondering how one should approach this if one is to achieve
>> Vectorization in Iceberg Reader (DatasourceV2) path. For instance, if
>> Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>> Iterator[InternalRow] in

Re: Approaching Vectorized Reading in Iceberg ..

2019-05-28 Thread Ryan Blue
Correct.

On Tue, May 28, 2019 at 3:13 PM Anton Okolnychyi 
wrote:

> Alright, so we are talking about reading Parquet data into
> ArrowRecordBatches and then exposing them as ColumnarBatches in Spark,
> where Spark ColumnVectors actually wrap Arrow FieldVectors, correct?
>
> - Anton
>
> > On 28 May 2019, at 21:24, Ryan Blue  wrote:
> >
> > From a performance viewpoint, this isn’t a great solution. The row by
> row approach will substantially hurt performance compared to the vectorized
> reader. I’ve seen 30% or more speed up when removing row-by-row access. So
> putting a row-by-row adapter in the middle of two vectorized
> representations is pretty costly.
> >
> > Iceberg doesn’t impose this requirement, it is how Spark consumes the
> rows itself, one at a time:
> https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L138
> >
> > By exposing Arrow data as Spark’s ColumnarBatch, we should pick up any
> benefits from improved execution when Spark is updated.
> >
> >
> > On Tue, May 28, 2019 at 12:33 PM Owen O'Malley 
> wrote:
> >
> >
> > On Fri, May 24, 2019 at 8:28 PM Ryan Blue 
> wrote:
> > if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
> Iterator[InternalRow] interface, it would still not work right? Coz it
> seems to me there is a lot more going on upstream in the operator execution
> path that would be needed to be done here.
> >
> > There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
> we want to take advantage of. You’re right that the first thing that Spark
> does it to get each row as InternalRow. But we still get a benefit from
> vectorizing the data materialization to Arrow itself. Spark execution is
> not vectorized, but that can be updated in Spark later (I think there’s a
> proposal).
> >
> > From a performance viewpoint, this isn't a great solution. The row by
> row approach will substantially hurt performance compared to the vectorized
> reader. I've seen 30% or more speed up when removing row-by-row access. So
> putting a row-by-row adapter in the middle of two vectorized
> representations is pretty costly.
> >
> > .. Owen
> >
> >
> >
> > --
> > Ryan Blue
> > Software Engineer
> > Netflix
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Approaching Vectorized Reading in Iceberg ..

2019-05-28 Thread Anton Okolnychyi
Alright, so we are talking about reading Parquet data into ArrowRecordBatches 
and then exposing them as ColumnarBatches in Spark, where Spark ColumnVectors 
actually wrap Arrow FieldVectors, correct?

- Anton

> On 28 May 2019, at 21:24, Ryan Blue  wrote:
> 
> From a performance viewpoint, this isn’t a great solution. The row by row 
> approach will substantially hurt performance compared to the vectorized 
> reader. I’ve seen 30% or more speed up when removing row-by-row access. So 
> putting a row-by-row adapter in the middle of two vectorized representations 
> is pretty costly.
> 
> Iceberg doesn’t impose this requirement, it is how Spark consumes the rows 
> itself, one at a time: 
> https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L138
> 
> By exposing Arrow data as Spark’s ColumnarBatch, we should pick up any 
> benefits from improved execution when Spark is updated.
> 
> 
> On Tue, May 28, 2019 at 12:33 PM Owen O'Malley  wrote:
> 
> 
> On Fri, May 24, 2019 at 8:28 PM Ryan Blue  wrote:
> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an 
> Iterator[InternalRow] interface, it would still not work right? Coz it seems 
> to me there is a lot more going on upstream in the operator execution path 
> that would be needed to be done here.
> 
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an 
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what we 
> want to take advantage of. You’re right that the first thing that Spark does 
> it to get each row as InternalRow. But we still get a benefit from 
> vectorizing the data materialization to Arrow itself. Spark execution is not 
> vectorized, but that can be updated in Spark later (I think there’s a 
> proposal).
> 
> From a performance viewpoint, this isn't a great solution. The row by row 
> approach will substantially hurt performance compared to the vectorized 
> reader. I've seen 30% or more speed up when removing row-by-row access. So 
> putting a row-by-row adapter in the middle of two vectorized representations 
> is pretty costly.
> 
> .. Owen
> 
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix



Re: Approaching Vectorized Reading in Iceberg ..

2019-05-28 Thread Ryan Blue
>From a performance viewpoint, this isn’t a great solution. The row by row
approach will substantially hurt performance compared to the vectorized
reader. I’ve seen 30% or more speed up when removing row-by-row access. So
putting a row-by-row adapter in the middle of two vectorized
representations is pretty costly.

Iceberg doesn’t impose this requirement, it is how Spark consumes the rows
itself, one at a time:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L138

By exposing Arrow data as Spark’s ColumnarBatch, we should pick up any
benefits from improved execution when Spark is updated.

On Tue, May 28, 2019 at 12:33 PM Owen O'Malley 
wrote:

>
>
> On Fri, May 24, 2019 at 8:28 PM Ryan Blue 
> wrote:
>
>> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>> Iterator[InternalRow] interface, it would still not work right? Coz it
>> seems to me there is a lot more going on upstream in the operator execution
>> path that would be needed to be done here.
>>
>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
>> we want to take advantage of. You’re right that the first thing that Spark
>> does it to get each row as InternalRow. But we still get a benefit from
>> vectorizing the data materialization to Arrow itself. Spark execution is
>> not vectorized, but that can be updated in Spark later (I think there’s a
>> proposal).
>>
> From a performance viewpoint, this isn't a great solution. The row by row
> approach will substantially hurt performance compared to the vectorized
> reader. I've seen 30% or more speed up when removing row-by-row access. So
> putting a row-by-row adapter in the middle of two vectorized
> representations is pretty costly.
>
> .. Owen
>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: Approaching Vectorized Reading in Iceberg ..

2019-05-28 Thread Owen O'Malley
On Fri, May 24, 2019 at 8:28 PM Ryan Blue  wrote:

> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
> Iterator[InternalRow] interface, it would still not work right? Coz it
> seems to me there is a lot more going on upstream in the operator execution
> path that would be needed to be done here.
>
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
> we want to take advantage of. You’re right that the first thing that Spark
> does it to get each row as InternalRow. But we still get a benefit from
> vectorizing the data materialization to Arrow itself. Spark execution is
> not vectorized, but that can be updated in Spark later (I think there’s a
> proposal).
>
>From a performance viewpoint, this isn't a great solution. The row by row
approach will substantially hurt performance compared to the vectorized
reader. I've seen 30% or more speed up when removing row-by-row access. So
putting a row-by-row adapter in the middle of two vectorized
representations is pretty costly.

.. Owen


Re: Approaching Vectorized Reading in Iceberg ..

2019-05-28 Thread Anton Okolnychyi
>  You’re right that the first thing that Spark does it to get each row as 
> InternalRow. But we still get a benefit from vectorizing the data 
> materialization to Arrow itself. Spark execution is not vectorized, but that 
> can be updated in Spark later (I think there’s a proposal).
> 
I am not sure I fully understand this statement. Ryan, could you clarify it?

To the best of my knowledge, Spark reads each column separately and constructs 
ColumnarBatches, which hold a ColumnVector for every column. Spark leverages 
MutableColumnarRow (which extends InternalRow) to expose data in columnar 
batches as rows. So, we don’t really create rows. 

In 2.4, data sources that implement SupportsScanColumnarBatch can expose data 
as ColumnarBatches. This will be taken into account by DataSourceV2ScanExec 
that implements ColumnarBatchScan. The latter is responsible for producing code 
that uses batches of data, which allows Spark to benefit from SIMD/instruction 
pipelining and so on.

Is our plan to read Parquet data into Arrow and then convert it into 
ColumnarBatch? And once Spark truly supports Arrow (I still have to go though 
that proposal) we will get rid of this conversion?

Thanks,
Anton

> On 25 May 2019, at 02:44, Gautam  wrote:
> 
> > There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an 
> > iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what 
> > we want to take advantage of. You’re right that the first thing that Spark 
> > does it to get each row as InternalRow. But we still get a benefit from 
> > vectorizing the data materialization to Arrow itself. Spark execution is 
> > not vectorized, but that can be updated in Spark later (I think there’s a 
> > proposal).
> 
> Got it. So to clarify.. for starters we may not get the *entire* benefit from 
> vectorization (coz spark's operator execution still needs to take advantage 
> of Arrow upstream), but by reading Parquet into Arrow and streaming it over 
> Iterator as a stream we benefit from the columnar materialization. 
> 
> >  I’m not sure we are going to want to do that right away instead of using 
> > the TripleIterator that Iceberg currently uses to abstract the Parquet page 
> > structure away.
> 
> I agree overall with the Arrow approach, was looking at Spark's vectorization 
> just to understand how the speed up happens and what the contracts should be 
> in the Iceberg flow when we do that using Arrow. My main concern was, even if 
> I did read this into Arrow, if the transfer is row-by-row we might be losing 
> the whole point of using an in-memory columnar format.  But as you said in 
> the first point that we still get a lot of the benefit with just organizing 
> columns in memory this way. 
> 
> 
> 
> 
> 
> 
> On Fri, May 24, 2019 at 5:28 PM Ryan Blue  wrote:
> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an 
> Iterator[InternalRow] interface, it would still not work right? Coz it seems 
> to me there is a lot more going on upstream in the operator execution path 
> that would be needed to be done here.
> 
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an 
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what we 
> want to take advantage of. You’re right that the first thing that Spark does 
> it to get each row as InternalRow. But we still get a benefit from 
> vectorizing the data materialization to Arrow itself. Spark execution is not 
> vectorized, but that can be updated in Spark later (I think there’s a 
> proposal).
> 
> I wouldn’t pay too much attention to the Parquet vectorized path in Spark 
> because it produces its own in-memory format and not Arrow. We want to take 
> advantage of Arrow so that we can use dictionary-encoded columns in record 
> batches. Spark’s vectorized Parquet reader also works directly on Parquet 
> pages instead of a higher-level abstraction. I’m not sure we are going to 
> want to do that right away instead of using the TripleIterator that Iceberg 
> currently uses to abstract the Parquet page structure away.
> 
> I would start by building converters that can build a column of Arrow data 
> from a TripleIterator. Then we can stitch those columns together to get 
> record batches and see how that performs. Then we can add complexity from 
> there.
> 
> 
> On Fri, May 24, 2019 at 4:28 PM Gautam  wrote:
> Hello devs,
>As a follow up to https://github.com/apache/incubator-iceberg/issues/9 
> I'v been reading through how Spark does vectorized reading in it's current 
> implementation which is in DataSource V1 path. Trying to see how we can 
> achieve the same impact in Iceberg's reading. To start with I want to form an 
> understanding at a high level of the approach one would need to take to 
> achieve this. Pardon my ignorance as I'm equally new to Spark codebase as I 
> am to Iceberg. Please correct me if my understanding is wrong.
> 
> So here's what Vectorization seems to be doing for Parque

Re: Approaching Vectorized Reading in Iceberg ..

2019-05-24 Thread Gautam
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
we want to take advantage of. You’re right that the first thing that Spark
does it to get each row as InternalRow. But we still get a benefit from
vectorizing the data materialization to Arrow itself. Spark execution is
not vectorized, but that can be updated in Spark later (I think there’s a
proposal).

Got it. So to clarify.. for starters we may not get the *entire* benefit
from vectorization (coz spark's operator execution still needs to take
advantage of Arrow upstream), but by reading Parquet into Arrow and
streaming it over Iterator as a stream we benefit from the
columnar materialization.

>  I’m not sure we are going to want to do that right away instead of using
the TripleIterator that Iceberg currently uses to abstract the Parquet page
structure away.

I agree overall with the Arrow approach, was looking at Spark's
vectorization just to understand how the speed up happens and what the
contracts should be in the Iceberg flow when we do that using Arrow. My
main concern was, even if I did read this into Arrow, if the transfer is
row-by-row we might be losing the whole point of using an in-memory
columnar format.  But as you said in the first point that we still get a
lot of the benefit with just organizing columns in memory this way.






On Fri, May 24, 2019 at 5:28 PM Ryan Blue  wrote:

> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
> Iterator[InternalRow] interface, it would still not work right? Coz it
> seems to me there is a lot more going on upstream in the operator execution
> path that would be needed to be done here.
>
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
> we want to take advantage of. You’re right that the first thing that Spark
> does it to get each row as InternalRow. But we still get a benefit from
> vectorizing the data materialization to Arrow itself. Spark execution is
> not vectorized, but that can be updated in Spark later (I think there’s a
> proposal).
>
> I wouldn’t pay too much attention to the Parquet vectorized path in Spark
> because it produces its own in-memory format and not Arrow. We want to take
> advantage of Arrow so that we can use dictionary-encoded columns in record
> batches. Spark’s vectorized Parquet reader also works directly on Parquet
> pages instead of a higher-level abstraction. I’m not sure we are going to
> want to do that right away instead of using the TripleIterator that Iceberg
> currently uses to abstract the Parquet page structure away.
>
> I would start by building converters that can build a column of Arrow data
> from a TripleIterator. Then we can stitch those columns together to get
> record batches and see how that performs. Then we can add complexity from
> there.
>
> On Fri, May 24, 2019 at 4:28 PM Gautam  wrote:
>
>> Hello devs,
>>As a follow up to
>> https://github.com/apache/incubator-iceberg/issues/9 I'v been reading
>> through how Spark does vectorized reading in it's current implementation
>> which is in DataSource V1 path. Trying to see how we can achieve the same
>> impact in Iceberg's reading. To start with I want to form an understanding
>> at a high level of the approach one would need to take to achieve this.
>> Pardon my ignorance as I'm equally new to Spark codebase as I am to
>> Iceberg. Please correct me if my understanding is wrong.
>>
>> So here's what Vectorization seems to be doing for Parquet reading:
>> - The DataSource scan execution uses ParquetFileFormat to build a
>> RecordReaderIterator [1] which underneath uses the
>> VectorizedParquetReaderReader.
>> - This record reader is used to iterate over entire batches of columns
>> (ColumnarBatch). The iterator.next() call returns a batch and not just a
>> row. The interfaces are such that allow an ColumnarBatch to be passed
>> around as a generic Object. As stated here [2]
>> - On the scan execution side, there is stage Code Generation that
>> compiles code that consumes entire batches at time so that physical
>> operators take advantage of the vectorization feature. So the scanner code
>> is aware that it's reading columnar batches out of the iterator.
>>
>>
>> I'm wondering how one should approach this if one is to achieve
>> Vectorization in Iceberg Reader (DatasourceV2) path. For instance, if
>> Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>> Iterator[InternalRow] interface, it would still not work right? Coz it
>> seems to me there is a lot more going on upstream in the operator execution
>> path that would be needed to be done here. It would be great if folks who
>> are more well-versed with the Spark codebase shed some light on this. In
>> general, what is the contract needed between V2 DataSourceReader (like
>> Iceberg) and the operator execution?
>>
>> thank 

Re: Approaching Vectorized Reading in Iceberg ..

2019-05-24 Thread Ryan Blue
if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
Iterator[InternalRow] interface, it would still not work right? Coz it
seems to me there is a lot more going on upstream in the operator execution
path that would be needed to be done here.

There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
we want to take advantage of. You’re right that the first thing that Spark
does it to get each row as InternalRow. But we still get a benefit from
vectorizing the data materialization to Arrow itself. Spark execution is
not vectorized, but that can be updated in Spark later (I think there’s a
proposal).

I wouldn’t pay too much attention to the Parquet vectorized path in Spark
because it produces its own in-memory format and not Arrow. We want to take
advantage of Arrow so that we can use dictionary-encoded columns in record
batches. Spark’s vectorized Parquet reader also works directly on Parquet
pages instead of a higher-level abstraction. I’m not sure we are going to
want to do that right away instead of using the TripleIterator that Iceberg
currently uses to abstract the Parquet page structure away.

I would start by building converters that can build a column of Arrow data
from a TripleIterator. Then we can stitch those columns together to get
record batches and see how that performs. Then we can add complexity from
there.

On Fri, May 24, 2019 at 4:28 PM Gautam  wrote:

> Hello devs,
>As a follow up to
> https://github.com/apache/incubator-iceberg/issues/9 I'v been reading
> through how Spark does vectorized reading in it's current implementation
> which is in DataSource V1 path. Trying to see how we can achieve the same
> impact in Iceberg's reading. To start with I want to form an understanding
> at a high level of the approach one would need to take to achieve this.
> Pardon my ignorance as I'm equally new to Spark codebase as I am to
> Iceberg. Please correct me if my understanding is wrong.
>
> So here's what Vectorization seems to be doing for Parquet reading:
> - The DataSource scan execution uses ParquetFileFormat to build a
> RecordReaderIterator [1] which underneath uses the
> VectorizedParquetReaderReader.
> - This record reader is used to iterate over entire batches of columns
> (ColumnarBatch). The iterator.next() call returns a batch and not just a
> row. The interfaces are such that allow an ColumnarBatch to be passed
> around as a generic Object. As stated here [2]
> - On the scan execution side, there is stage Code Generation that compiles
> code that consumes entire batches at time so that physical operators take
> advantage of the vectorization feature. So the scanner code is aware that
> it's reading columnar batches out of the iterator.
>
>
> I'm wondering how one should approach this if one is to achieve
> Vectorization in Iceberg Reader (DatasourceV2) path. For instance, if
> Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
> Iterator[InternalRow] interface, it would still not work right? Coz it
> seems to me there is a lot more going on upstream in the operator execution
> path that would be needed to be done here. It would be great if folks who
> are more well-versed with the Spark codebase shed some light on this. In
> general, what is the contract needed between V2 DataSourceReader (like
> Iceberg) and the operator execution?
>
> thank you,
> -Gautam.
>
>
> [1] -
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412
> [2] -
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L29
>


-- 
Ryan Blue
Software Engineer
Netflix