Writing to Delta might not support the write.option method. We set spark.hadoop.parquet.block.size in our spark config for writing to Delta.
Adam On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho <chrisbcouti...@gmail.com> wrote: > I tried re-writing the table with the updated block size but it doesn't > appear to have an effect on the row group size. > > ```pyspark > df = spark.read.format("delta").load("/path/to/source1") > > df.write \ > .format("delta") \ > .mode("overwrite") \ > .options(**{ > "parquet.block.size": "1m", > }) \ > .partitionBy("date") \ > .save("/path/to/source2") > ``` > > The files created by this job are about 20m in size. Using `parquet-tools` > I can inspect a single file and see the following 12m file contains a > single row group - not the expected 12 based on the block size: > > $ parquet-tools inspect /path/to/source2/date=.../part-0000.parquet > ############ file meta data ############ > created_by: parquet-mr version 1.10.1-databricks9 (build > cf6c823f85c3b69d49e1573e48e236148c709e82) > num_columns: 19 > num_rows: 369483 > num_row_groups: 1 > format_version: 1.0 > serialized_size: 6364 > > ############ Columns ############ > ... > > Chris > > On Fri, Feb 11, 2022 at 3:37 PM Sean Owen <sro...@gmail.com> wrote: > >> It should just be parquet.block.size indeed. >> spark.write.option("parquet.block.size", "16m").parquet(...) >> This is an issue in how you write, not read, the parquet. >> >> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho <chrisbcouti...@gmail.com> >> wrote: >> >>> Hi Adam, >>> >>> Thanks for the explanation on the empty partitions. >>> >>> We have the freedom to adjust how the source table is written, so if >>> there are any improvements we can implement on the source side we'd be >>> happy to look into that. >>> >>> It's not yet clear to me how you can reduce the row group size of the >>> parquet files, I see some mention of `parquet.block.size` online , as well >>> as various map reduce settings regarding file splitting (SO: >>> mapred-min-split-size-in-hdfs >>> <https://stackoverflow.com/questions/19188315/behavior-of-the-parameter-mapred-min-split-size-in-hdfs>); >>> however, I don't quite understand the link between the splitting settings, >>> row group configuration, and resulting number of records when reading from >>> a delta table. >>> >>> For more specifics: we're running Spark 3.1.2 using ADLS as cloud >>> storage. >>> >>> Best, >>> Chris >>> >>> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford <adam...@gmail.com> wrote: >>> >>>> The smallest unit of work you can do on a parquet file (under the delta >>>> hood) is based on the parquet row group size, which by default is 128mb. If >>>> you specify maxPartitionBytes of 10mb, what that will basically do is >>>> create a partition for each 10mb of a file, but whatever partition covers >>>> the part of the file where the row group starts will consume the entire row >>>> group. That's why you're seeing a lot of empty partitions and a small >>>> number with the rest of the actual data. >>>> >>>> Can't think of any solution other than repartitioning (or rewriting the >>>> input Delta table with a much smaller row group size which wouldn't be >>>> ideal performance wise). >>>> >>>> Adam >>>> >>>> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho < >>>> chrisbcouti...@gmail.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> We have a spark structured streaming job that includes a stream-static >>>>> join and a Pandas UDF, streaming to/from delta tables. The primary key of >>>>> the static table is non-unique, meaning that the streaming join results in >>>>> multiple records per input record - in our case 100x increase. The Pandas >>>>> UDF is then applied to the resulting stream-static join and stored in a >>>>> table. To avoid OOM errors on the executors, we need to start with very >>>>> small (~10MB) partitions to account for the expansion. Currently this only >>>>> seems possible by explicitly repartitioning the data, incurring the perf >>>>> cost associated with the shuffle. Is it possible to force spark to read >>>>> parquet files into 10MB partitions without explicitly repartitioning? >>>>> >>>>> The documentation regarding Performance Tuning [0] suggests that it >>>>> should be possible to control how spark reads files into partitions - >>>>> we're >>>>> assuming this accounts for structured streaming jobs as well. Based on our >>>>> understanding of the page, we used the following to configure spark into >>>>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each. >>>>> >>>>> spark.sql.files.openCostInBytes 128MB >>>>> spark.sql.files.maxPartitionBytes 10MB >>>>> spark.sql.files.minPartitionNum 1000 >>>>> >>>>> Unfortunately we still see a large number of empty partitions and a >>>>> small number containing the rest of the data (see median vs max number of >>>>> input records). >>>>> >>>>> [image: image.png] >>>>> >>>>> Any help would be much appreciated >>>>> >>>>> Chris >>>>> >>>> >>>> >>>> -- >>>> Adam Binford >>>> >>>