Hey Anton, Here's the code https://github.com/prodeezy/incubator-iceberg/pull/2/files .. Mind you, it's just a proof of concept to get something going so please ignore the code design (or lack thereof :-) ). I'v attached the flat data benchmark code as well.
Lemme know what you think. On Thu, Jun 13, 2019 at 10:56 PM Anton Okolnychyi <aokolnyc...@apple.com> wrote: > Gautam, could you also share the code for benchmarks and conversion? > > Thanks, > Anton > > On 13 Jun 2019, at 19:38, Ryan Blue <rb...@netflix.com.INVALID> wrote: > > Sounds like a good start. I think the next step is to avoid using the > ParquetReader.FileIterator and deserialize directly from TripleIterator > <https://github.com/apache/incubator-iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java>. > I think the reason why this is taking longer is that (I think) you’re doing > all the work to materialize the data in rows, and then converting to > vectors. > > To work on top of TripleIterator, I think you need to create a > ParquetValueReader for Arrow batches. That would be configured with a > batch size, so that when you pass it into ParquetReader, the FileIterator > returns batches instead of individual rows. > > Does that make sense? > > rb > > On Wed, Jun 12, 2019 at 11:22 PM Gautam <gautamkows...@gmail.com> wrote: > >> Hey Ryan and Anton, >> >> I wanted to circle back on some findings I had after taking a first stab >> at this .. >> >> >>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an >>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s >>> what we want to take advantage of.. >> >> >> This is how I went about this... I wrapped the >> *ParquetReader.FileIterator* with an iterator that creates Arrow batch >> from rows (one batch for every 100 rows) returned by *FileIterator.next() >> *. I exposed each Arrow Batch from this iterator as a ColumnarBatch >> which has a * .rowIterator() *that reads this as a sequence of >> InternalRow. I return this in the *Reader.open() *call in Iceberg. >> >> Here are some microbenchmark numbers on on flat parquet file scanning ... >> >> 1 warmup, 3 iterations, 10 columns per row , 100k records per file, 10 >> files >> >> *Benchmark Mode Cnt Score(sec) Error Units* >> readFileV1SourceVectorized avgt 3 0.870 ± 1.883 s/op >> readIcebergNoBatching avgt 3 1.854 ± 2.003 s/op >> readIcebergWithBatching100k avgt 3 3.216 ± 0.520 s/op >> readIcebergWithBatching10k avgt 3 8.763 ± 2.338 s/op >> readIcebergWithBatching5k avgt 3 13.964 ± 6.328 s/op >> >> >> The Batching doesn't seem to add any benefit. I measured the conversion >> times and am reading this as the overhead from extra copies to Arrow and >> then to ColumnarBatch again. Although I was hoping that the materialization >> to arrow would offset some of that overhead. >> >> Wondering what my next step should be.. >> 1) Eliminate the extra conversion IO overhead by reading each column type >> directly into ArrowColumnVector? >> 2) Should I extend IcebergSource to support the SupportsScanColumnarBatch >> mixin and expose the ColumnarBatch? >> >> >> Appreciate your guidance, >> >> -Gautam. >> >> >> >> On Fri, May 24, 2019 at 5:28 PM Ryan Blue <rb...@netflix.com.invalid> >> wrote: >> >>> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an >>> Iterator[InternalRow] interface, it would still not work right? Coz it >>> seems to me there is a lot more going on upstream in the operator execution >>> path that would be needed to be done here. >>> >>> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an >>> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what >>> we want to take advantage of. You’re right that the first thing that Spark >>> does it to get each row as InternalRow. But we still get a benefit from >>> vectorizing the data materialization to Arrow itself. Spark execution is >>> not vectorized, but that can be updated in Spark later (I think there’s a >>> proposal). >>> >>> I wouldn’t pay too much attention to the Parquet vectorized path in >>> Spark because it produces its own in-memory format and not Arrow. We want >>> to take advantage of Arrow so that we can use dictionary-encoded columns in >>> record batches. Spark’s vectorized Parquet reader also works directly on >>> Parquet pages instead of a higher-level abstraction. I’m not sure we are >>> going to want to do that right away instead of using the TripleIterator >>> that Iceberg currently uses to abstract the Parquet page structure away. >>> >>> I would start by building converters that can build a column of Arrow >>> data from a TripleIterator. Then we can stitch those columns together to >>> get record batches and see how that performs. Then we can add complexity >>> from there. >>> >>> On Fri, May 24, 2019 at 4:28 PM Gautam <gautamkows...@gmail.com> wrote: >>> >>>> Hello devs, >>>> As a follow up to >>>> https://github.com/apache/incubator-iceberg/issues/9 I'v been reading >>>> through how Spark does vectorized reading in it's current implementation >>>> which is in DataSource V1 path. Trying to see how we can achieve the same >>>> impact in Iceberg's reading. To start with I want to form an understanding >>>> at a high level of the approach one would need to take to achieve this. >>>> Pardon my ignorance as I'm equally new to Spark codebase as I am to >>>> Iceberg. Please correct me if my understanding is wrong. >>>> >>>> So here's what Vectorization seems to be doing for Parquet reading: >>>> - The DataSource scan execution uses ParquetFileFormat to build a >>>> RecordReaderIterator [1] which underneath uses the >>>> VectorizedParquetReaderReader. >>>> - This record reader is used to iterate over entire batches of columns >>>> (ColumnarBatch). The iterator.next() call returns a batch and not just a >>>> row. The interfaces are such that allow an ColumnarBatch to be passed >>>> around as a generic Object. As stated here [2] >>>> - On the scan execution side, there is stage Code Generation that >>>> compiles code that consumes entire batches at time so that physical >>>> operators take advantage of the vectorization feature. So the scanner code >>>> is aware that it's reading columnar batches out of the iterator. >>>> >>>> >>>> I'm wondering how one should approach this if one is to achieve >>>> Vectorization in Iceberg Reader (DatasourceV2) path. For instance, if >>>> Iceberg Reader was to wrap Arrow or ColumnarBatch behind an >>>> Iterator[InternalRow] interface, it would still not work right? Coz it >>>> seems to me there is a lot more going on upstream in the operator execution >>>> path that would be needed to be done here. It would be great if folks who >>>> are more well-versed with the Spark codebase shed some light on this. In >>>> general, what is the contract needed between V2 DataSourceReader (like >>>> Iceberg) and the operator execution? >>>> >>>> thank you, >>>> -Gautam. >>>> >>>> >>>> [1] - >>>> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412 >>>> [2] - >>>> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L29 >>>> >>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> > > -- > Ryan Blue > Software Engineer > Netflix > > >