>  You’re right that the first thing that Spark does it to get each row as 
> InternalRow. But we still get a benefit from vectorizing the data 
> materialization to Arrow itself. Spark execution is not vectorized, but that 
> can be updated in Spark later (I think there’s a proposal).
> 
I am not sure I fully understand this statement. Ryan, could you clarify it?

To the best of my knowledge, Spark reads each column separately and constructs 
ColumnarBatches, which hold a ColumnVector for every column. Spark leverages 
MutableColumnarRow (which extends InternalRow) to expose data in columnar 
batches as rows. So, we don’t really create rows. 

In 2.4, data sources that implement SupportsScanColumnarBatch can expose data 
as ColumnarBatches. This will be taken into account by DataSourceV2ScanExec 
that implements ColumnarBatchScan. The latter is responsible for producing code 
that uses batches of data, which allows Spark to benefit from SIMD/instruction 
pipelining and so on.

Is our plan to read Parquet data into Arrow and then convert it into 
ColumnarBatch? And once Spark truly supports Arrow (I still have to go though 
that proposal) we will get rid of this conversion?

Thanks,
Anton

> On 25 May 2019, at 02:44, Gautam <gautamkows...@gmail.com> wrote:
> 
> > There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an 
> > iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what 
> > we want to take advantage of. You’re right that the first thing that Spark 
> > does it to get each row as InternalRow. But we still get a benefit from 
> > vectorizing the data materialization to Arrow itself. Spark execution is 
> > not vectorized, but that can be updated in Spark later (I think there’s a 
> > proposal).
> 
> Got it. So to clarify.. for starters we may not get the *entire* benefit from 
> vectorization (coz spark's operator execution still needs to take advantage 
> of Arrow upstream), but by reading Parquet into Arrow and streaming it over 
> Iterator<Internal> as a stream we benefit from the columnar materialization. 
> 
> >  I’m not sure we are going to want to do that right away instead of using 
> > the TripleIterator that Iceberg currently uses to abstract the Parquet page 
> > structure away.
> 
> I agree overall with the Arrow approach, was looking at Spark's vectorization 
> just to understand how the speed up happens and what the contracts should be 
> in the Iceberg flow when we do that using Arrow. My main concern was, even if 
> I did read this into Arrow, if the transfer is row-by-row we might be losing 
> the whole point of using an in-memory columnar format.  But as you said in 
> the first point that we still get a lot of the benefit with just organizing 
> columns in memory this way. 
> 
> 
> 
> 
> 
> 
> On Fri, May 24, 2019 at 5:28 PM Ryan Blue <rb...@netflix.com.invalid> wrote:
> if Iceberg Reader was to wrap Arrow or ColumnarBatch behind an 
> Iterator[InternalRow] interface, it would still not work right? Coz it seems 
> to me there is a lot more going on upstream in the operator execution path 
> that would be needed to be done here.
> 
> There’s already a wrapper to adapt Arrow to ColumnarBatch, as well as an 
> iterator to read a ColumnarBatch as a sequence of InternalRow. That’s what we 
> want to take advantage of. You’re right that the first thing that Spark does 
> it to get each row as InternalRow. But we still get a benefit from 
> vectorizing the data materialization to Arrow itself. Spark execution is not 
> vectorized, but that can be updated in Spark later (I think there’s a 
> proposal).
> 
> I wouldn’t pay too much attention to the Parquet vectorized path in Spark 
> because it produces its own in-memory format and not Arrow. We want to take 
> advantage of Arrow so that we can use dictionary-encoded columns in record 
> batches. Spark’s vectorized Parquet reader also works directly on Parquet 
> pages instead of a higher-level abstraction. I’m not sure we are going to 
> want to do that right away instead of using the TripleIterator that Iceberg 
> currently uses to abstract the Parquet page structure away.
> 
> I would start by building converters that can build a column of Arrow data 
> from a TripleIterator. Then we can stitch those columns together to get 
> record batches and see how that performs. Then we can add complexity from 
> there.
> 
> 
> On Fri, May 24, 2019 at 4:28 PM Gautam <gautamkows...@gmail.com> wrote:
> Hello devs,
>        As a follow up to https://github.com/apache/incubator-iceberg/issues/9 
> I'v been reading through how Spark does vectorized reading in it's current 
> implementation which is in DataSource V1 path. Trying to see how we can 
> achieve the same impact in Iceberg's reading. To start with I want to form an 
> understanding at a high level of the approach one would need to take to 
> achieve this. Pardon my ignorance as I'm equally new to Spark codebase as I 
> am to Iceberg. Please correct me if my understanding is wrong.
> 
> So here's what Vectorization seems to be doing for Parquet reading: 
> - The DataSource scan execution uses ParquetFileFormat to build a 
> RecordReaderIterator [1] which underneath uses the 
> VectorizedParquetReaderReader.
> - This record reader is used to iterate over entire batches of columns 
> (ColumnarBatch). The iterator.next() call returns a batch and not just a row. 
> The interfaces are such that allow an ColumnarBatch to be passed around as a 
> generic Object. As stated here [2]
> - On the scan execution side, there is stage Code Generation that compiles 
> code that consumes entire batches at time so that physical operators take 
> advantage of the vectorization feature. So the scanner code is aware that 
> it's reading columnar batches out of the iterator. 
> 
> 
> I'm wondering how one should approach this if one is to achieve Vectorization 
> in Iceberg Reader (DatasourceV2) path. For instance, if Iceberg Reader was to 
> wrap Arrow or ColumnarBatch behind an Iterator[InternalRow] interface, it 
> would still not work right? Coz it seems to me there is a lot more going on 
> upstream in the operator execution path that would be needed to be done here. 
> It would be great if folks who are more well-versed with the Spark codebase 
> shed some light on this. In general, what is the contract needed between V2 
> DataSourceReader (like Iceberg) and the operator execution? 
> 
> thank you,
> -Gautam.
> 
> 
> [1] - 
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412
> [2] - 
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala#L29
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix

Reply via email to