Hello Devs,
                   As some of you know there's been ongoing work as part of
[1] to build Arrow based vectorization into Iceberg. There's a separate
thread on this dev list where that is being discussed and progress is being
tracked in a separate branch [2]. The overall approach there is to build a
ground up Arrow based implementation of vectorization within Iceberg so
that any compute engine using Iceberg would benefit from those
optimizations. We feel that is indeed the longer term solution and the best
way forward.

Meanwhile, Xabriel & I took to investigating an interim approach where
Iceberg could use the current Vectorization code built into Spark Parquet
reading, which I will refer to as "*V1 Vectorization*". This is the code
path that Spark's DataSourceV1 readers use to read Parquet data. The result
is that we have performance parity between Iceberg and Spark's Vanilla
Parquet reader. We thought we should share this with the larger community
so others can benefit from this gain.

*What we did *:
- Added a new reader viz. *V1VectorizedReader *that internally short
circuits to using the V1 codepath [3]  which does most of the setup and
work to perform vectorization. it's exactly what Vanilla Spark's reader
does underneath the DSV1 implementation.
- It builds an iterator which expects ColumnarBatches from the Objects
returned by the resolving iterator.
- We re-organized and optimized code while building *ReadTask *instances which
considerably improved task initiation and planning time.
- Setting `*iceberg.read.enableV1VectorizedReader*` to true enables this
reader in IcebergSource.
- The V1Vectorized reader is an independent class with copied code in some
methods as we didn't want to degrade perf due to inheritance/virtual method
calls (we noticed degradation when we did try to re-use code).
- I'v pushed this code to a separate branch [4] in case others want to give
this a try.


*The Numbers*:


Flat Data 10 files 10M rows each


Benchmark
          Mode  Cnt   Score   Error  Units

IcebergSourceFlatParquetDataReadBenchmark.readFileSourceNonVectorized
            ss    5  63.631 ± 1.300   s/op

IcebergSourceFlatParquetDataReadBenchmark.readFileSourceVectorized
            ss    5  28.322 ± 2.400   s/op

IcebergSourceFlatParquetDataReadBenchmark.readIceberg
            ss    5  65.862 ± 2.480   s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized10k
            ss    5  28.199 ± 1.255   s/op

IcebergSourceFlatParquetDataReadBenchmark.readIcebergV1Vectorized20k
            ss    5  29.822 ± 2.848   s/op

IcebergSourceFlatParquetDataReadBenchmark.*readIcebergV1Vectorized5k*
              ss    5  27.953 ± 0.949   s/op




Flat Data Projections 10 files 10M rows each


Benchmark
          Mode  Cnt   Score   Error  Units

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionFileSourceNonVectorized
  ss    5  11.307 ± 1.791   s/op

IcebergSourceFlatParquetDataReadBenchmark.
*readWithProjectionFileSourceVectorized*       ss    5   3.480 ± 0.087
s/op

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIceberg
            ss    5  11.057 ± 0.236   s/op

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized10k
    ss    5   3.953 ± 1.592   s/op

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized20k
    ss    5   3.619 ± 1.305   s/op

IcebergSourceFlatParquetDataReadBenchmark.readWithProjectionIcebergV1Vectorized5k
    ss    5   4.109 ± 1.734   s/op


Filtered Data 500 files 10k rows each


Benchmark
        Mode  Cnt  Score   Error  Units

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceNonVectorized
  ss    5  2.139 ± 0.719   s/op

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterFileSourceVectorized
      ss    5  2.213 ± 0.598   s/op

IcebergSourceFlatParquetDataFilterBenchmark.
*readWithFilterIcebergNonVectorized*       ss    5  0.144 ± 0.029   s/op

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized100k
  ss    5  0.179 ± 0.019   s/op

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized10k
    ss    5  0.189 ± 0.046   s/op

IcebergSourceFlatParquetDataFilterBenchmark.readWithFilterIcebergV1Vectorized5k
    ss    5  0.195 ± 0.137   s/op


*Perf Notes*:
- Iceberg V1 Vectorization's real gain (over current Iceberg impl) is in
flat data scans. Notice how it's almost exactly same as vanilla spark
vectorization.
- Projections work equally well. Although we see Nested column projections
are still not performing as well as we need to be able to push nested
column projections down to Iceberg.
- We saw a slight overhead with Iceberg V1 Vectorization over smaller
workloads, but this goes away with larger data files.

*Why we think this is useful*:
- This approach allows users to benefit from both: 1) Iceberg's metadata
filtering and 2) Spark's Scan Vectorization. This should help with Iceberg
adoption.
- We think this can be an interim solution (until Arrow based impl is fully
performant) for those who are currently blocked by performance difference
between Iceberg and Spark's native Vectorization for interactive usecases.
There's a lot of optimization work and testing gone into V1 vectorization
that Iceberg can now benefit from.
- In many cases companies have proprietary implementations of
*ParquetFileFormat* that could have extended features like complex type
support etc. Our code can use that at runtime as long as '
*buildReaderWithPartitionValues()*'  signature is consistent.. if not the
reader can be easily modified to plug their own vectorized reader in.
- While profiling the Arrow implementation I found it difficult to compare
bottlenecks due to major differences between DSv1 and DSv2 client-to-source
interface paths. This makes it easier to compare numbers and profile code
between V1 vectorization and Arrow vectorization as we now have both paths
working behind a single DataSourceV2 path (viz. IcebergSource).

*Limitations*:
- This implementation is specific to Spark so other compute frameworks like
Presto won't benefit from this.
- It doesn't use Iceberg's Value Reader interface as it bypasses everything
under the Task Data Reading. (added a separate *V1VectorizedTaskDataReader*)
- Need to maintain two readers, as adding any code to Reader.java might
need changes to V1Vectorized Reader. Although, we could minimize this with
a *ReaderUtils* class.


I have the code checked in at
https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader .
If folks think this is useful and we can keep this as an interim solution
behind a feature flag, I can get a PR up with proper unit tests.

thanks and regards,
-Gautam.


[1] - https://github.com/apache/incubator-iceberg/issues/9
[2] - https://github.com/apache/incubator-iceberg/tree/vectorized-read
[3] -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L197
[4] -
https://github.com/prodeezy/incubator-iceberg/tree/v1-vectorized-reader

Reply via email to