Hey Anton,
          Here's the code
https://github.com/prodeezy/incubator-iceberg/pull/2/files .. Mind you,
it's just a proof of concept to get something going so please ignore the
code design (or lack thereof :-) ).  I'v attached the flat data benchmark
code as well.

Lemme know what you think.







On Thu, Jun 13, 2019 at 10:56 PM Anton Okolnychyi <aokolnyc...@apple.com>
wrote:

> Gautam, could you also share the code for benchmarks and conversion?
>
> Thanks,
> Anton
>
> On 13 Jun 2019, at 19:38, Ryan Blue <rb...@netflix.com.INVALID> wrote:
>
> Sounds like a good start. I think the next step is to avoid using the
> ParquetReader.FileIterator and deserialize directly from TripleIterator
> <https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java>.
> I think the reason why this is taking longer is that (I think) you’re doing
> all the work to materialize the data in rows, and then converting to
> vectors.
>
> To work on top of TripleIterator, I think you need to create a
> ParquetValueReader for Arrow batches. That would be configured with a
> batch size, so that when you pass it into ParquetReader, the FileIterator
> returns batches instead of individual rows.
>
> Does that make sense?
>
> rb
>
> On Wed, Jun 12, 2019 at 11:22 PM Gautam <gautamkows...@gmail.com> wrote:
>
>> Hey Ryan and Anton,
>>
>> I wanted to circle back on some findings I had after taking a first stab
>> at this ..
>>
>>
>>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s
>>> what we want to take advantage of..
>>
>>
>> This is how I went about this... I wrapped the
>> *ParquetReader.FileIterator* with an iterator that creates Arrow batch
>> from rows (one batch for every 100 rows) returned by *FileIterator.next()
>> *. I exposed each Arrow Batch from this iterator as a ColumnarBatch
>> which has a * .rowIterator() *that reads this as a sequence of
>> InternalRow. I return this in the  *Reader.open() *call in Iceberg.
>>
>> Here are some microbenchmark numbers on on flat parquet file scanning ...
>>
>> 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10
>> files
>>
>> *Benchmark                    Mode  Cnt   Score(sec)   Error     Units*
>> readFileV1SourceVectorized   avgt    3   0.870        ± 1.883   s/op
>> readIcebergNoBatching        avgt    3   1.854        ± 2.003   s/op
>> readIcebergWithBatching100k  avgt    3   3.216        ± 0.520   s/op
>> readIcebergWithBatching10k   avgt    3   8.763        ± 2.338   s/op
>> readIcebergWithBatching5k    avgt    3  13.964        ± 6.328   s/op
>>
>>
>> The Batching doesn't seem to add any benefit. I measured the conversion
>> times and am reading this as the overhead from extra copies to Arrow and
>> then to ColumnarBatch again. Although I was hoping that the materialization
>> to arrow would offset some of that overhead.
>>
>> Wondering what my next step should be..
>> 1) Eliminate the extra conversion IO overhead by reading each column type
>> directly into ArrowColumnVector?
>> 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch
>> mixin and expose the ColumnarBatch?
>>
>>
>> Appreciate your guidance,
>>
>> -Gautam.
>>
>>
>>
>> On Fri, May 24, 2019 at 5:28 PM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>>> Iterator[InternalRow] interface, it would still not work right? Coz it
>>> seems to me there is a lot more going on upstream in the operator execution
>>> path that would be needed to be done here.
>>>
>>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an
>>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what
>>> we want to take advantage of. You’re right that the first thing that Spark
>>> does it to get each row as InternalRow. But we still get a benefit from
>>> vectorizing the data materialization to Arrow itself. Spark execution is
>>> not vectorized, but that can be updated in Spark later (I think there’s a
>>> proposal).
>>>
>>> I wouldn’t pay too much attention to the Parquet vectorized path in
>>> Spark because it produces its own in-memory format and not Arrow. We want
>>> to take advantage of Arrow so that we can use dictionary-encoded columns in
>>> record batches. Spark’s vectorized Parquet reader also works directly on
>>> Parquet pages instead of a higher-level abstraction. I’m not sure we are
>>> going to want to do that right away instead of using the TripleIterator
>>> that Iceberg currently uses to abstract the Parquet page structure away.
>>>
>>> I would start by building converters that can build a column of Arrow
>>> data from a TripleIterator. Then we can stitch those columns together to
>>> get record batches and see how that performs. Then we can add complexity
>>> from there.
>>>
>>> On Fri, May 24, 2019 at 4:28 PM Gautam <gautamkows...@gmail.com> wrote:
>>>
>>>> Hello devs,
>>>>        As a follow up to
>>>> https://github.com/apache/incubator-iceberg/issues/9 I'v been reading
>>>> through how Spark does vectorized reading in it's current implementation
>>>> which is in DataSource V1 path. Trying to see how we can achieve the same
>>>> impact in Iceberg's reading. To start with I want to form an understanding
>>>> at a high level of the approach one would need to take to achieve this.
>>>> Pardon my ignorance as I'm equally new to Spark codebase as I am to
>>>> Iceberg. Please correct me if my understanding is wrong.
>>>>
>>>> So here's what Vectorization seems to be doing for Parquet reading:
>>>> - The DataSource scan execution uses ParquetFileFormat to build a
>>>> RecordReaderIterator [1] which underneath uses the
>>>> VectorizedParquetReaderReader.
>>>> - This record reader is used to iterate over entire batches of columns
>>>> (ColumnarBatch). The iterator.next() call returns a batch and not just a
>>>> row. The interfaces are such that allow an ColumnarBatch to be passed
>>>> around as a generic Object. As stated here [2]
>>>> - On the scan execution side, there is stage Code Generation that
>>>> compiles code that consumes entire batches at time so that physical
>>>> operators take advantage of the vectorization feature. So the scanner code
>>>> is aware that it's reading columnar batches out of the iterator.
>>>>
>>>>
>>>> I'm wondering how one should approach this if one is to achieve
>>>> Vectorization in Iceberg Reader (DatasourceV2) path. For instance, if
>>>> Iceberg Reader was to wrap Arrow or ColumnarBatch behind an
>>>> Iterator[InternalRow] interface, it would still not work right? Coz it
>>>> seems to me there is a lot more going on upstream in the operator execution
>>>> path that would be needed to be done here. It would be great if folks who
>>>> are more well-versed with the Spark codebase shed some light on this. In
>>>> general, what is the contract needed between V2 DataSourceReader (like
>>>> Iceberg) and the operator execution?
>>>>
>>>> thank you,
>>>> -Gautam.
>>>>
>>>>
>>>> [1] -
>>>> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412
>>>> [2] -
>>>> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L29
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>

Reply via email to