Hi everyone, I did some experiments with splitting up wide Parquet files into multiple column families. You can check the PR here: https://github.com/apache/iceberg/pull/13306.
What the test does: - Creates tables with 100/1000/10000 columns, where the column type is double - Generates random data into these columns - Reads and writes records to these tables - Using the current implementation of the reader/writer - Using multiple (2, 5, 10) column families - Using single and multiple threads for parallelizing the reading/writing when there are multiple column families I have used my local machine to run the tests, and used my local disk to store the files. Here is what I have learned: - I had to be very strict about random generation, since if I did any reuse of the records the Parquet writer was able to use the duplication to decrease the size of the data files. This was especially prominent when I had a high number of column families. This highlights the possible gains coming from vertically splitting the tables, but wanted to avoid this in testing as it is very dependent on the use-case - Reading performance gains kick in after a few hundred columns. We even gain them when the reading is not parallelized. We don't gain too much on parallelization - most probably my environment is IO bound - Write performance gains kick in sooner, but needs parallelization to not lose performance. The gains are much more substantial - as more CPUs are used for compression. In these tests,* I have seen more than 15 percent read performance improvements, and up to 50 percent write improvements*. For me this signals that vertically splitting wide tables to multiple files could help us use more of the available CPU/IO, and could have substantial gains above the functional features that it makes possible. If you think we should test different scenarios, feel free to use the code in the PR, or share your thoughts. Thanks, Peter The results in detail: - families:0, multiThreaded:true - could be ignored (skipped to run the tests) - marked the values generated by the current readers/writers with *bold* Benchmark (columns) (families) (multiThreaded) Mode Cnt Score Error Units MultiThreadedParquetBenchmark.read 100 0 true ss 20 ≈ 10⁻⁷ s/op *MultiThreadedParquetBenchmark.read 100 0 false ss 20 3.739 ± 0.096 s/op* MultiThreadedParquetBenchmark.read 100 1 true ss 20 3.883 ± 0.062 s/op MultiThreadedParquetBenchmark.read 100 1 false ss 20 3.968 ± 0.070 s/op MultiThreadedParquetBenchmark.read 100 2 true ss 20 4.063 ± 0.080 s/op MultiThreadedParquetBenchmark.read 100 2 false ss 20 4.036 ± 0.082 s/op MultiThreadedParquetBenchmark.read 100 5 true ss 20 4.093 ± 0.083 s/op MultiThreadedParquetBenchmark.read 100 5 false ss 20 4.090 ± 0.070 s/op MultiThreadedParquetBenchmark.read 100 10 true ss 20 4.267 ± 0.087 s/op MultiThreadedParquetBenchmark.read 100 10 false ss 20 4.206 ± 0.075 s/op MultiThreadedParquetBenchmark.read 1000 0 true ss 20 ≈ 10⁻⁷ s/op *MultiThreadedParquetBenchmark.read 1000 0 false ss 20 5.276 ± 0.408 s/op* MultiThreadedParquetBenchmark.read 1000 1 true ss 20 5.202 ± 0.403 s/op MultiThreadedParquetBenchmark.read 1000 1 false ss 20 5.224 ± 0.397 s/op MultiThreadedParquetBenchmark.read 1000 2 true ss 20 4.881 ± 0.281 s/op MultiThreadedParquetBenchmark.read 1000 2 false ss 20 4.794 ± 0.295 s/op MultiThreadedParquetBenchmark.read 1000 5 true ss 20 5.096 ± 0.259 s/op MultiThreadedParquetBenchmark.read 1000 5 false ss 20 5.181 ± 0.288 s/op MultiThreadedParquetBenchmark.read 1000 10 true ss 20 5.408 ± 0.252 s/op MultiThreadedParquetBenchmark.read 1000 10 false ss 20 5.336 ± 0.185 s/op MultiThreadedParquetBenchmark.read 10000 0 true ss 20 ≈ 10⁻⁷ s/op *MultiThreadedParquetBenchmark.read 10000 0 false ss 20 8.692 ± 1.246 s/op* MultiThreadedParquetBenchmark.read 10000 1 true ss 20 8.337 ± 0.415 s/op MultiThreadedParquetBenchmark.read 10000 1 false ss 20 9.073 ± 0.864 s/op MultiThreadedParquetBenchmark.read 10000 2 true ss 20 8.180 ± 1.078 s/op MultiThreadedParquetBenchmark.read 10000 2 false ss 20 8.140 ± 1.852 s/op MultiThreadedParquetBenchmark.read 10000 5 true ss 20 7.300 ± 0.157 s/op MultiThreadedParquetBenchmark.read 10000 5 false ss 20 7.436 ± 0.538 s/op MultiThreadedParquetBenchmark.read 10000 10 true ss 20 7.108 ± 0.542 s/op MultiThreadedParquetBenchmark.read 10000 10 false ss 20 7.545 ± 0.369 s/op Benchmark (columns) (families) (multiThreaded) Mode Cnt Score Error Units MultiThreadedParquetBenchmark.write 100 0 true ss 20 ≈ 10⁻⁵ s/op *MultiThreadedParquetBenchmark.write 100 0 false ss 20 32.397 ± 0.149 s/op* MultiThreadedParquetBenchmark.write 100 1 true ss 20 30.131 ± 0.104 s/op MultiThreadedParquetBenchmark.write 100 1 false ss 20 33.453 ± 0.298 s/op MultiThreadedParquetBenchmark.write 100 2 true ss 20 19.974 ± 0.054 s/op MultiThreadedParquetBenchmark.write 100 2 false ss 20 30.766 ± 0.128 s/op MultiThreadedParquetBenchmark.write 100 5 true ss 20 20.624 ± 0.589 s/op MultiThreadedParquetBenchmark.write 100 5 false ss 20 29.911 ± 0.557 s/op MultiThreadedParquetBenchmark.write 100 10 true ss 20 32.833 ± 0.095 s/op MultiThreadedParquetBenchmark.write 100 10 false ss 20 30.026 ± 0.333 s/op MultiThreadedParquetBenchmark.write 1000 0 true ss 20 ≈ 10⁻⁶ s/op *MultiThreadedParquetBenchmark.write 1000 0 false ss 20 31.667 ± 0.703 s/op* MultiThreadedParquetBenchmark.write 1000 1 true ss 20 27.017 ± 0.332 s/op MultiThreadedParquetBenchmark.write 1000 1 false ss 20 32.549 ± 0.564 s/op MultiThreadedParquetBenchmark.write 1000 2 true ss 20 23.391 ± 0.143 s/op MultiThreadedParquetBenchmark.write 1000 2 false ss 20 37.228 ± 0.511 s/op MultiThreadedParquetBenchmark.write 1000 5 true ss 20 21.928 ± 0.495 s/op MultiThreadedParquetBenchmark.write 1000 5 false ss 20 37.102 ± 0.419 s/op MultiThreadedParquetBenchmark.write 1000 10 true ss 20 21.746 ± 0.204 s/op MultiThreadedParquetBenchmark.write 1000 10 false ss 20 37.436 ± 0.631 s/op MultiThreadedParquetBenchmark.write 10000 0 true ss 20 ≈ 10⁻⁶ s/op *MultiThreadedParquetBenchmark.write 10000 0 false ss 20 56.578 ± 3.320 s/op* MultiThreadedParquetBenchmark.write 10000 1 true ss 20 37.899 ± 1.668 s/op MultiThreadedParquetBenchmark.write 10000 1 false ss 20 58.548 ± 3.984 s/op MultiThreadedParquetBenchmark.write 10000 2 true ss 20 30.998 ± 1.585 s/op MultiThreadedParquetBenchmark.write 10000 2 false ss 20 66.287 ± 3.917 s/op MultiThreadedParquetBenchmark.write 10000 5 true ss 20 27.117 ± 1.537 s/op MultiThreadedParquetBenchmark.write 10000 5 false ss 20 72.872 ± 2.782 s/op MultiThreadedParquetBenchmark.write 10000 10 true ss 20 34.729 ± 1.158 s/op MultiThreadedParquetBenchmark.write 10000 10 false ss 20 73.049 ± 3.050 s/op Micah Kornfield <emkornfi...@gmail.com> ezt írta (időpont: 2025. jún. 7., Szo, 7:57): > At a high-level we should probably work out if supporting wide tables with > performant appends is something we want to invest effort into and focus on > the lower level questions once that is resolved. I think it would be great > to make this work, I think the main question is whether any PMC/community > members feel like it would introduce too much complexity to proceed with > further design/analysis. > > Some more detailed replies to what has been discussed in the thread: > > I might be wrong, but page skipping relies on page headers which are >> stored in-line with the data itself. When downloading data from blob stores >> this could be less than ideal. > > > No, parquet supports page indices > <https://parquet.apache.org/docs/file-format/pageindex/> [1]. I think it > is reasonable to also think about improvements to Parquet for large blobs > so these can be handled better. There is also general interest in Parquet > in evolving it to better support some of these use-cases in general, so if > there are clear items that can be pushed to the file level, let's have > those conversations. > > >> Would it not be more a data file/parquet "issue" ? Especially with the >> data file API you are proposing, I think Iceberg should "delegate" to >> the data file layer (Parquet here) and Iceberg could be "agnostic". > > > I think maybe we should maybe table the discussion on exactly what belongs > in each layer until we have more data. Roughly the concerns expressed I > think boil down into a few mains buckets: > > 1. Read vs Write amplification (I think one could run some rough > experiments with low-level Parquet APIs to see the impact of splitting out > columns into individual objects to answer both sides of this). > - For large blobs, I think memory pressure becomes a real concern > here as well. > > 2. Complexity: > - If multiple files are needed for performance, what advantages do we > gain from having effectively two level manifests)? What does it do to > Iceberg metadata to have to track both (V4 is actually a great place to > look at this since it seems like we are looking at major metadata overhauls > anyway, if it is too ambitious we can perhaps postpone some of the work to > v5)? > - What are the implications for things like time-travel, maintenance, > etc in these cases. I would guess this probably needs a little bit more > detailed design considering the two options (pushing some concerns down to > parquet vs handling everything in Iceberg metadata). > > > Some of the complexity questions can be answered by prototyping the APIs > necessary to make this work. Specifically I think we would at least need: > > 1. An `newAppendColumns` API added to the transaction > <https://iceberg.apache.org/javadoc/1.9.1/org/apache/iceberg/Transaction.html> > [2]. > Lance's APIs might provide some inspiration [3] here. > 2a. New abstractions to handle columns for the same rows split across > files. > 2b. New File level APIs for > - Append columns > - Delete files (if we decide on multiple files for a row-range > and it is pushed down the file level, the deletion logic needs to be > delegated to the file level as well). > > Items 2a/2b depend on the ultimate approach taken but trying to sketch > these out and how they relate to the transaction API, might help inform the > decision on complexity. > > Other feature interactions probably need a more careful analysis when > proposing the spec changes. > > Cheers, > Micah > > [1] https://parquet.apache.org/docs/file-format/pageindex/ > [2] > https://iceberg.apache.org/javadoc/1.9.1/org/apache/iceberg/Transaction.html > [3] > https://lancedb.github.io/lance/introduction/schema_evolution.html#adding-new-columns > > On Fri, Jun 6, 2025 at 10:39 AM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> Hi Peter >> >> Thanks for your message. It's an interesting topic. >> >> Would it not be more a data file/parquet "issue" ? Especially with the >> data file API you are proposing, I think Iceberg should "delegate" to >> the data file layer (Parquet here) and Iceberg could be "agnostic". >> >> Regards >> JB >> >> On Mon, May 26, 2025 at 6:28 AM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> > >> > Hi Team, >> > >> > In machine learning use-cases, it's common to encounter tables with a >> very high number of columns - sometimes even in the range of several >> thousand. I've seen cases with up to 15,000 columns. Storing such wide >> tables in a single Parquet file is often suboptimal, as Parquet can become >> a bottleneck, even when only a subset of columns is queried. >> > >> > A common approach to mitigate this is to split the data across multiple >> Parquet files. With the upcoming File Format API, we could introduce a >> layer that combines these files into a single iterator, enabling efficient >> reading of wide and very wide tables. >> > >> > To support this, we would need to revise the metadata specification. >> Instead of the current `_file` column, we could introduce a _files column >> containing: >> > - `_file_column_ids`: the column IDs present in each file >> > - `_file_path`: the path to the corresponding file >> > >> > Has there been any prior discussion around this idea? >> > Is anyone else interested in exploring this further? >> > >> > Best regards, >> > Peter >> >