Hi everyone,

I did some experiments with splitting up wide Parquet files into multiple
column families. You can check the PR here:
https://github.com/apache/iceberg/pull/13306.

What the test does:

   - Creates tables with 100/1000/10000 columns, where the column type is
   double
   - Generates random data into these columns
   - Reads and writes records to these tables
      - Using the current implementation of the reader/writer
      - Using multiple (2, 5, 10) column families
      - Using single and multiple threads for parallelizing the
      reading/writing when there are multiple column families

I have used my local machine to run the tests, and used my local disk to
store the files.

Here is what I have learned:

   - I had to be very strict about random generation, since if I did any
   reuse of the records the Parquet writer was able to use the duplication to
   decrease the size of the data files. This was especially prominent when I
   had a high number of column families. This highlights the possible gains
   coming from vertically splitting the tables, but wanted to avoid this in
   testing as it is very dependent on the use-case
   - Reading performance gains kick in after a few hundred columns. We even
   gain them when the reading is not parallelized. We don't gain too much on
   parallelization - most probably my environment is IO bound
   - Write performance gains kick in sooner, but needs parallelization to
   not lose performance. The gains are much more substantial - as more CPUs
   are used for compression.

In these tests,* I have seen more than 15 percent read performance
improvements, and up to 50 percent write improvements*.

For me this signals that vertically splitting wide tables to multiple files
could help us use more of the available CPU/IO, and could have substantial
gains above the functional features that it makes possible.

If you think we should test different scenarios, feel free to use the code
in the PR, or share your thoughts.

Thanks,
Peter

The results in detail:

   - families:0, multiThreaded:true - could be ignored (skipped to run the
   tests)
   - marked the values generated by the current readers/writers with *bold*


Benchmark                            (columns)  (families)  (multiThreaded)
 Mode  Cnt   Score    Error  Units
MultiThreadedParquetBenchmark.read        100           0             true
   ss   20  ≈ 10⁻⁷            s/op
*MultiThreadedParquetBenchmark.read        100           0            false
   ss   20   3.739 ±  0.096   s/op*
MultiThreadedParquetBenchmark.read        100           1             true
   ss   20   3.883 ±  0.062   s/op
MultiThreadedParquetBenchmark.read        100           1            false
   ss   20   3.968 ±  0.070   s/op
MultiThreadedParquetBenchmark.read        100           2             true
   ss   20   4.063 ±  0.080   s/op
MultiThreadedParquetBenchmark.read        100           2            false
   ss   20   4.036 ±  0.082   s/op
MultiThreadedParquetBenchmark.read        100           5             true
   ss   20   4.093 ±  0.083   s/op
MultiThreadedParquetBenchmark.read        100           5            false
   ss   20   4.090 ±  0.070   s/op
MultiThreadedParquetBenchmark.read        100          10             true
   ss   20   4.267 ±  0.087   s/op
MultiThreadedParquetBenchmark.read        100          10            false
   ss   20   4.206 ±  0.075   s/op
MultiThreadedParquetBenchmark.read       1000           0             true
   ss   20  ≈ 10⁻⁷            s/op
*MultiThreadedParquetBenchmark.read       1000           0            false
   ss   20   5.276 ±  0.408   s/op*
MultiThreadedParquetBenchmark.read       1000           1             true
   ss   20   5.202 ±  0.403   s/op
MultiThreadedParquetBenchmark.read       1000           1            false
   ss   20   5.224 ±  0.397   s/op
MultiThreadedParquetBenchmark.read       1000           2             true
   ss   20   4.881 ±  0.281   s/op
MultiThreadedParquetBenchmark.read       1000           2            false
   ss   20   4.794 ±  0.295   s/op
MultiThreadedParquetBenchmark.read       1000           5             true
   ss   20   5.096 ±  0.259   s/op
MultiThreadedParquetBenchmark.read       1000           5            false
   ss   20   5.181 ±  0.288   s/op
MultiThreadedParquetBenchmark.read       1000          10             true
   ss   20   5.408 ±  0.252   s/op
MultiThreadedParquetBenchmark.read       1000          10            false
   ss   20   5.336 ±  0.185   s/op
MultiThreadedParquetBenchmark.read      10000           0             true
   ss   20  ≈ 10⁻⁷            s/op
*MultiThreadedParquetBenchmark.read      10000           0            false
   ss   20   8.692 ±  1.246   s/op*
MultiThreadedParquetBenchmark.read      10000           1             true
   ss   20   8.337 ±  0.415   s/op
MultiThreadedParquetBenchmark.read      10000           1            false
   ss   20   9.073 ±  0.864   s/op
MultiThreadedParquetBenchmark.read      10000           2             true
   ss   20   8.180 ±  1.078   s/op
MultiThreadedParquetBenchmark.read      10000           2            false
   ss   20   8.140 ±  1.852   s/op
MultiThreadedParquetBenchmark.read      10000           5             true
   ss   20   7.300 ±  0.157   s/op
MultiThreadedParquetBenchmark.read      10000           5            false
   ss   20   7.436 ±  0.538   s/op
MultiThreadedParquetBenchmark.read      10000          10             true
   ss   20   7.108 ±  0.542   s/op
MultiThreadedParquetBenchmark.read      10000          10            false
   ss   20   7.545 ±  0.369   s/op

Benchmark                             (columns)  (families)
 (multiThreaded)  Mode  Cnt   Score    Error  Units
MultiThreadedParquetBenchmark.write        100           0             true
   ss   20  ≈ 10⁻⁵            s/op
*MultiThreadedParquetBenchmark.write        100           0
 false    ss   20  32.397 ±  0.149   s/op*
MultiThreadedParquetBenchmark.write        100           1             true
   ss   20  30.131 ±  0.104   s/op
MultiThreadedParquetBenchmark.write        100           1            false
   ss   20  33.453 ±  0.298   s/op
MultiThreadedParquetBenchmark.write        100           2             true
   ss   20  19.974 ±  0.054   s/op
MultiThreadedParquetBenchmark.write        100           2            false
   ss   20  30.766 ±  0.128   s/op
MultiThreadedParquetBenchmark.write        100           5             true
   ss   20  20.624 ±  0.589   s/op
MultiThreadedParquetBenchmark.write        100           5            false
   ss   20  29.911 ±  0.557   s/op
MultiThreadedParquetBenchmark.write        100          10             true
   ss   20  32.833 ±  0.095   s/op
MultiThreadedParquetBenchmark.write        100          10            false
   ss   20  30.026 ±  0.333   s/op
MultiThreadedParquetBenchmark.write       1000           0             true
   ss   20  ≈ 10⁻⁶            s/op
*MultiThreadedParquetBenchmark.write       1000           0
 false    ss   20  31.667 ±  0.703   s/op*
MultiThreadedParquetBenchmark.write       1000           1             true
   ss   20  27.017 ±  0.332   s/op
MultiThreadedParquetBenchmark.write       1000           1            false
   ss   20  32.549 ±  0.564   s/op
MultiThreadedParquetBenchmark.write       1000           2             true
   ss   20  23.391 ±  0.143   s/op
MultiThreadedParquetBenchmark.write       1000           2            false
   ss   20  37.228 ±  0.511   s/op
MultiThreadedParquetBenchmark.write       1000           5             true
   ss   20  21.928 ±  0.495   s/op
MultiThreadedParquetBenchmark.write       1000           5            false
   ss   20  37.102 ±  0.419   s/op
MultiThreadedParquetBenchmark.write       1000          10             true
   ss   20  21.746 ±  0.204   s/op
MultiThreadedParquetBenchmark.write       1000          10            false
   ss   20  37.436 ±  0.631   s/op
MultiThreadedParquetBenchmark.write      10000           0             true
   ss   20  ≈ 10⁻⁶            s/op
*MultiThreadedParquetBenchmark.write      10000           0
 false    ss   20  56.578 ±  3.320   s/op*
MultiThreadedParquetBenchmark.write      10000           1             true
   ss   20  37.899 ±  1.668   s/op
MultiThreadedParquetBenchmark.write      10000           1            false
   ss   20  58.548 ±  3.984   s/op
MultiThreadedParquetBenchmark.write      10000           2             true
   ss   20  30.998 ±  1.585   s/op
MultiThreadedParquetBenchmark.write      10000           2            false
   ss   20  66.287 ±  3.917   s/op
MultiThreadedParquetBenchmark.write      10000           5             true
   ss   20  27.117 ±  1.537   s/op
MultiThreadedParquetBenchmark.write      10000           5            false
   ss   20  72.872 ±  2.782   s/op
MultiThreadedParquetBenchmark.write      10000          10             true
   ss   20  34.729 ±  1.158   s/op
MultiThreadedParquetBenchmark.write      10000          10            false
   ss   20  73.049 ±  3.050   s/op

Micah Kornfield <emkornfi...@gmail.com> ezt írta (időpont: 2025. jún. 7.,
Szo, 7:57):

> At a high-level we should probably work out if supporting wide tables with
> performant appends is something we want to invest effort into and focus on
> the lower level questions once that is resolved.  I think it would be great
> to make this work, I think the main question is whether any PMC/community
> members feel like it would introduce too much complexity to proceed with
> further design/analysis.
>
> Some more detailed replies to what has been discussed in the thread:
>
> I might be wrong, but page skipping relies on page headers which are
>> stored in-line with the data itself. When downloading data from blob stores
>> this could be less than ideal.
>
>
> No, parquet supports page indices
> <https://parquet.apache.org/docs/file-format/pageindex/> [1].  I think it
> is reasonable to also think about improvements to Parquet for large blobs
> so these can be handled better. There is also general interest in Parquet
> in evolving it to better support some of these use-cases in general, so if
> there are clear items that can be pushed to the file level, let's have
> those conversations.
>
>
>> Would it not be more a data file/parquet "issue" ? Especially with the
>> data file API you are proposing, I think Iceberg should "delegate" to
>> the data file layer (Parquet here) and Iceberg could be "agnostic".
>
>
> I think maybe we should maybe table the discussion on exactly what belongs
> in each layer until we have more data.  Roughly the concerns expressed I
> think boil down into a few mains buckets:
>
> 1.  Read vs Write amplification (I think one could run some rough
> experiments with low-level Parquet APIs to see the impact of splitting out
> columns into individual objects to answer both sides of this).
>      - For large blobs, I think memory pressure becomes a real concern
> here as well.
>
> 2.  Complexity:
>      - If multiple files are needed for performance, what advantages do we
> gain from having effectively two level manifests)?  What does it do to
> Iceberg metadata to have to track both (V4 is actually a great place to
> look at this since it seems like we are looking at major metadata overhauls
> anyway, if it is too ambitious we can perhaps postpone some of the work to
> v5)?
>      - What are the implications for things like time-travel, maintenance,
> etc in these cases.  I would guess this probably needs a little bit more
> detailed design considering the two options (pushing some concerns down to
> parquet vs handling everything in Iceberg metadata).
>
>
> Some of the complexity questions can be answered by prototyping the APIs
> necessary to make this work. Specifically  I think we would at least need:
>
> 1.  An `newAppendColumns` API added to the transaction
> <https://iceberg.apache.org/javadoc/1.9.1/org/apache/iceberg/Transaction.html>
>  [2].
> Lance's APIs might provide some inspiration [3] here.
> 2a.  New abstractions to handle columns for the same rows split across
> files.
> 2b.  New File level APIs for
>            - Append columns
>            - Delete files (if we decide on multiple files for a row-range
> and it is pushed down the file level, the deletion logic needs to be
> delegated to the file level as well).
>
> Items 2a/2b depend on the ultimate approach taken but trying to sketch
> these out and how they relate to the transaction API, might help inform the
> decision on complexity.
>
> Other feature interactions probably need a more careful analysis when
> proposing the spec changes.
>
> Cheers,
> Micah
>
> [1] https://parquet.apache.org/docs/file-format/pageindex/
> [2]
> https://iceberg.apache.org/javadoc/1.9.1/org/apache/iceberg/Transaction.html
> [3]
> https://lancedb.github.io/lance/introduction/schema_evolution.html#adding-new-columns
>
> On Fri, Jun 6, 2025 at 10:39 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
>> Hi Peter
>>
>> Thanks for your message. It's an interesting topic.
>>
>> Would it not be more a data file/parquet "issue" ? Especially with the
>> data file API you are proposing, I think Iceberg should "delegate" to
>> the data file layer (Parquet here) and Iceberg could be "agnostic".
>>
>> Regards
>> JB
>>
>> On Mon, May 26, 2025 at 6:28 AM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>> >
>> > Hi Team,
>> >
>> > In machine learning use-cases, it's common to encounter tables with a
>> very high number of columns - sometimes even in the range of several
>> thousand. I've seen cases with up to 15,000 columns. Storing such wide
>> tables in a single Parquet file is often suboptimal, as Parquet can become
>> a bottleneck, even when only a subset of columns is queried.
>> >
>> > A common approach to mitigate this is to split the data across multiple
>> Parquet files. With the upcoming File Format API, we could introduce a
>> layer that combines these files into a single iterator, enabling efficient
>> reading of wide and very wide tables.
>> >
>> > To support this, we would need to revise the metadata specification.
>> Instead of the current `_file` column, we could introduce a _files column
>> containing:
>> > - `_file_column_ids`: the column IDs present in each file
>> > - `_file_path`: the path to the corresponding file
>> >
>> > Has there been any prior discussion around this idea?
>> > Is anyone else interested in exploring this further?
>> >
>> > Best regards,
>> > Peter
>>
>

Reply via email to