Hey Guys,
           Sorry bout the delay on this. Just got back on getting a basic
working implementation in Iceberg for Vectorization on primitive types.

*Here's what I have so far :  *

I have added `ParquetValueReader` implementations for some basic primitive
types that build the respective Arrow Vector (`ValueVector`) viz.
`IntVector` for int, `VarCharVector` for strings and so on. Underneath each
value vector reader there are column iterators that read from the parquet
pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
`ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
stitched together using a `ColumnarBatchReader` (which as the name suggests
wraps ColumnarBatches in the iterator)   I'v verified that these pieces
work properly with the underlying interfaces.  I'v also made changes to
Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
`SupportsScanColumnarBatch` mixin to the reader).  So the reader now
expects ColumnarBatch instances (instead of InternalRow). The query
planning runtime works fine with these changes.

Although it fails during query execution, the bit it's  currently failing
at is this line of code :
https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414

This code, I think,  tries to apply the iterator's schema projection on the
InternalRow instances. This seems to be tightly coupled to InternalRow as
Spark's catalyst expressions have implemented the UnsafeProjection for
InternalRow only. If I take this out and just return the
`Iterator<ColumnarBatch>` iterator I built it returns empty result on the
client. I'm guessing this is coz Spark is unaware of the iterator's schema?
There's a Todo in the code that says "*remove the projection by reporting
the iterator's schema back to Spark*".  Is there a simple way to
communicate that to Spark for my new iterator? Any pointers on how to get
around this?


Thanks and Regards,
-Gautam.




On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:

> Replies inline.
>
> On Fri, Jun 14, 2019 at 1:11 AM Gautam <gautamkows...@gmail.com> wrote:
>
>> Thanks for responding Ryan,
>>
>> Couple of follow up questions on ParquetValueReader for Arrow..
>>
>> I'd like to start with testing Arrow out with readers for primitive type
>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>> currently doesn't have converters for map type. How can I default these
>> types to regular materialization whilst supporting Arrow based support for
>> primitives?
>>
>
> We should look at what Spark does to handle maps.
>
> I think we should get the prototype working with test cases that don't
> have maps, structs, or lists. Just getting primitives working is a good
> start and just won't hit these problems.
>
>
>> Lemme know if this makes sense...
>>
>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>> ArrowColumnVectors of corresponding column types by iterating over
>> underlying ColumnIterator *n times*, where n is size of batch.
>>
>
> Sounds good to me. I'm not sure about extending vs wrapping because I'm
> not too familiar with the Arrow APIs.
>
>
>> - Reader.newParquetIterable()  maps primitive column types to the newly
>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>> current *InternalRow* based ValueReaders
>>
>
> Sounds good for primitives, but I would just leave the nested types
> un-implemented for now.
>
>
>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although* *I'm
>> a bit lost on how the stitching of columns happens currently*? .. and
>> how the ArrowColumnVectors could  be stitched alongside regular columns
>> that don't have arrow based support ?
>>
>
> I don't think that you can mix regular columns and Arrow columns. It has
> to be all one or the other. That's why it's easier to start with
> primitives, then add structs, then lists, and finally maps.
>
>
>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so that
>> DataSourceV2ScanExec starts using ColumnarBatch scans
>>
>
> We will probably need two paths. One for columnar batches and one for
> row-based reads. That doesn't need to be done right away and what you
> already have in your working copy makes sense as a start.
>
>
>> That's a lot of questions! :-) but hope i'm making sense.
>>
>> -Gautam.
>>
>>
>>
>> [1] -
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to