I tried re-writing the table with the updated block size but it doesn't appear to have an effect on the row group size.
```pyspark df = spark.read.format("delta").load("/path/to/source1") df.write \ .format("delta") \ .mode("overwrite") \ .options(**{ "parquet.block.size": "1m", }) \ .partitionBy("date") \ .save("/path/to/source2") ``` The files created by this job are about 20m in size. Using `parquet-tools` I can inspect a single file and see the following 12m file contains a single row group - not the expected 12 based on the block size: $ parquet-tools inspect /path/to/source2/date=.../part-0000.parquet ############ file meta data ############ created_by: parquet-mr version 1.10.1-databricks9 (build cf6c823f85c3b69d49e1573e48e236148c709e82) num_columns: 19 num_rows: 369483 num_row_groups: 1 format_version: 1.0 serialized_size: 6364 ############ Columns ############ ... Chris On Fri, Feb 11, 2022 at 3:37 PM Sean Owen <sro...@gmail.com> wrote: > It should just be parquet.block.size indeed. > spark.write.option("parquet.block.size", "16m").parquet(...) > This is an issue in how you write, not read, the parquet. > > On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho <chrisbcouti...@gmail.com> > wrote: > >> Hi Adam, >> >> Thanks for the explanation on the empty partitions. >> >> We have the freedom to adjust how the source table is written, so if >> there are any improvements we can implement on the source side we'd be >> happy to look into that. >> >> It's not yet clear to me how you can reduce the row group size of the >> parquet files, I see some mention of `parquet.block.size` online , as well >> as various map reduce settings regarding file splitting (SO: >> mapred-min-split-size-in-hdfs >> <https://stackoverflow.com/questions/19188315/behavior-of-the-parameter-mapred-min-split-size-in-hdfs>); >> however, I don't quite understand the link between the splitting settings, >> row group configuration, and resulting number of records when reading from >> a delta table. >> >> For more specifics: we're running Spark 3.1.2 using ADLS as cloud storage. >> >> Best, >> Chris >> >> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford <adam...@gmail.com> wrote: >> >>> The smallest unit of work you can do on a parquet file (under the delta >>> hood) is based on the parquet row group size, which by default is 128mb. If >>> you specify maxPartitionBytes of 10mb, what that will basically do is >>> create a partition for each 10mb of a file, but whatever partition covers >>> the part of the file where the row group starts will consume the entire row >>> group. That's why you're seeing a lot of empty partitions and a small >>> number with the rest of the actual data. >>> >>> Can't think of any solution other than repartitioning (or rewriting the >>> input Delta table with a much smaller row group size which wouldn't be >>> ideal performance wise). >>> >>> Adam >>> >>> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <chrisbcouti...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> We have a spark structured streaming job that includes a stream-static >>>> join and a Pandas UDF, streaming to/from delta tables. The primary key of >>>> the static table is non-unique, meaning that the streaming join results in >>>> multiple records per input record - in our case 100x increase. The Pandas >>>> UDF is then applied to the resulting stream-static join and stored in a >>>> table. To avoid OOM errors on the executors, we need to start with very >>>> small (~10MB) partitions to account for the expansion. Currently this only >>>> seems possible by explicitly repartitioning the data, incurring the perf >>>> cost associated with the shuffle. Is it possible to force spark to read >>>> parquet files into 10MB partitions without explicitly repartitioning? >>>> >>>> The documentation regarding Performance Tuning [0] suggests that it >>>> should be possible to control how spark reads files into partitions - we're >>>> assuming this accounts for structured streaming jobs as well. Based on our >>>> understanding of the page, we used the following to configure spark into >>>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each. >>>> >>>> spark.sql.files.openCostInBytes 128MB >>>> spark.sql.files.maxPartitionBytes 10MB >>>> spark.sql.files.minPartitionNum 1000 >>>> >>>> Unfortunately we still see a large number of empty partitions and a >>>> small number containing the rest of the data (see median vs max number of >>>> input records). >>>> >>>> [image: image.png] >>>> >>>> Any help would be much appreciated >>>> >>>> Chris >>>> >>> >>> >>> -- >>> Adam Binford >>> >>