Figured this out. I'm returning ColumnarBatch iterator directly without
projection with schema set appropriately in `readSchema() `.. the empty
result was due to valuesRead not being set correctly on FileIterator. Did
that and things are working. Will circle back with numbers soon.

On Fri, Jul 19, 2019 at 5:22 PM Gautam <gautamkows...@gmail.com> wrote:

> Hey Guys,
>            Sorry bout the delay on this. Just got back on getting a basic
> working implementation in Iceberg for Vectorization on primitive types.
>
> *Here's what I have so far :  *
>
> I have added `ParquetValueReader` implementations for some basic primitive
> types that build the respective Arrow Vector (`ValueVector`) viz.
> `IntVector` for int, `VarCharVector` for strings and so on. Underneath each
> value vector reader there are column iterators that read from the parquet
> pagestores (rowgroups) in chunks. These `ValueVector-s` are lined up as
> `ArrowColumnVector`-s (which is ColumnVector wrapper backed by Arrow) and
> stitched together using a `ColumnarBatchReader` (which as the name suggests
> wraps ColumnarBatches in the iterator)   I'v verified that these pieces
> work properly with the underlying interfaces.  I'v also made changes to
> Iceberg's `Reader` to  implement `planBatchPartitions()` (to add the
> `SupportsScanColumnarBatch` mixin to the reader).  So the reader now
> expects ColumnarBatch instances (instead of InternalRow). The query
> planning runtime works fine with these changes.
>
> Although it fails during query execution, the bit it's  currently failing
> at is this line of code :
> https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java#L414
>
> This code, I think,  tries to apply the iterator's schema projection on
> the InternalRow instances. This seems to be tightly coupled to InternalRow
> as Spark's catalyst expressions have implemented the UnsafeProjection for
> InternalRow only. If I take this out and just return the
> `Iterator<ColumnarBatch>` iterator I built it returns empty result on the
> client. I'm guessing this is coz Spark is unaware of the iterator's schema?
> There's a Todo in the code that says "*remove the projection by reporting
> the iterator's schema back to Spark*".  Is there a simple way to
> communicate that to Spark for my new iterator? Any pointers on how to get
> around this?
>
>
> Thanks and Regards,
> -Gautam.
>
>
>
>
> On Fri, Jun 14, 2019 at 4:22 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> Replies inline.
>>
>> On Fri, Jun 14, 2019 at 1:11 AM Gautam <gautamkows...@gmail.com> wrote:
>>
>>> Thanks for responding Ryan,
>>>
>>> Couple of follow up questions on ParquetValueReader for Arrow..
>>>
>>> I'd like to start with testing Arrow out with readers for primitive type
>>> and incrementally add in Struct/Array support, also ArrowWriter [1]
>>> currently doesn't have converters for map type. How can I default these
>>> types to regular materialization whilst supporting Arrow based support for
>>> primitives?
>>>
>>
>> We should look at what Spark does to handle maps.
>>
>> I think we should get the prototype working with test cases that don't
>> have maps, structs, or lists. Just getting primitives working is a good
>> start and just won't hit these problems.
>>
>>
>>> Lemme know if this makes sense...
>>>
>>> - I extend  PrimitiveReader (for Arrow) that loads primitive types into
>>> ArrowColumnVectors of corresponding column types by iterating over
>>> underlying ColumnIterator *n times*, where n is size of batch.
>>>
>>
>> Sounds good to me. I'm not sure about extending vs wrapping because I'm
>> not too familiar with the Arrow APIs.
>>
>>
>>> - Reader.newParquetIterable()  maps primitive column types to the newly
>>> added ArrowParquetValueReader but for other types (nested types, etc.) uses
>>> current *InternalRow* based ValueReaders
>>>
>>
>> Sounds good for primitives, but I would just leave the nested types
>> un-implemented for now.
>>
>>
>>> - Stitch the columns vectors together to create ColumnarBatch, (Since
>>> *SupportsScanColumnarBatch* mixin currently expects this ) .. *although*
>>>  *I'm a bit lost on how the stitching of columns happens currently*? ..
>>> and how the ArrowColumnVectors could  be stitched alongside regular columns
>>> that don't have arrow based support ?
>>>
>>
>> I don't think that you can mix regular columns and Arrow columns. It has
>> to be all one or the other. That's why it's easier to start with
>> primitives, then add structs, then lists, and finally maps.
>>
>>
>>> - Reader returns readTasks as  *InputPartition<*ColumnarBatch*> *so
>>> that DataSourceV2ScanExec starts using ColumnarBatch scans
>>>
>>
>> We will probably need two paths. One for columnar batches and one for
>> row-based reads. That doesn't need to be done right away and what you
>> already have in your working copy makes sense as a start.
>>
>>
>>> That's a lot of questions! :-) but hope i'm making sense.
>>>
>>> -Gautam.
>>>
>>>
>>>
>>> [1] -
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Reply via email to