I tried re-writing the table with the updated block size but it doesn't
appear to have an effect on the row group size.

```pyspark
df = spark.read.format("delta").load("/path/to/source1")

df.write \
.format("delta") \
.mode("overwrite") \
.options(**{
  "parquet.block.size": "1m",
}) \
.partitionBy("date") \
.save("/path/to/source2")
```

The files created by this job are about 20m in size. Using `parquet-tools`
I can inspect a single file and see the following 12m file contains a
single row group - not the expected 12 based on the block size:

$ parquet-tools inspect /path/to/source2/date=.../part-0000.parquet
############ file meta data ############
created_by: parquet-mr version 1.10.1-databricks9 (build
cf6c823f85c3b69d49e1573e48e236148c709e82)
num_columns: 19
num_rows: 369483
num_row_groups: 1
format_version: 1.0
serialized_size: 6364

############ Columns ############
...

Chris

On Fri, Feb 11, 2022 at 3:37 PM Sean Owen <sro...@gmail.com> wrote:

> It should just be parquet.block.size indeed.
> spark.write.option("parquet.block.size", "16m").parquet(...)
> This is an issue in how you write, not read, the parquet.
>
> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho <chrisbcouti...@gmail.com>
> wrote:
>
>> Hi Adam,
>>
>> Thanks for the explanation on the empty partitions.
>>
>> We have the freedom to adjust how the source table is written, so if
>> there are any improvements we can implement on the source side we'd be
>> happy to look into that.
>>
>> It's not yet clear to me how you can reduce the row group size of the
>> parquet files, I see some mention of `parquet.block.size` online , as well
>> as various map reduce settings regarding file splitting (SO:
>> mapred-min-split-size-in-hdfs
>> <https://stackoverflow.com/questions/19188315/behavior-of-the-parameter-mapred-min-split-size-in-hdfs>);
>> however, I don't quite understand the link between the splitting settings,
>> row group configuration, and resulting number of records when reading from
>> a delta table.
>>
>> For more specifics: we're running Spark 3.1.2 using ADLS as cloud storage.
>>
>> Best,
>> Chris
>>
>> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford <adam...@gmail.com> wrote:
>>
>>> The smallest unit of work you can do on a parquet file (under the delta
>>> hood) is based on the parquet row group size, which by default is 128mb. If
>>> you specify maxPartitionBytes of 10mb, what that will basically do is
>>> create a partition for each 10mb of a file, but whatever partition covers
>>> the part of the file where the row group starts will consume the entire row
>>> group. That's why you're seeing a lot of empty partitions and a small
>>> number with the rest of the actual data.
>>>
>>> Can't think of any solution other than repartitioning (or rewriting the
>>> input Delta table with a much smaller row group size which wouldn't be
>>> ideal performance wise).
>>>
>>> Adam
>>>
>>> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <chrisbcouti...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> We have a spark structured streaming job that includes a stream-static
>>>> join and a Pandas UDF, streaming to/from delta tables. The primary key of
>>>> the static table is non-unique, meaning that the streaming join results in
>>>> multiple records per input record - in our case 100x increase. The Pandas
>>>> UDF is then applied to the resulting stream-static join and stored in a
>>>> table. To avoid OOM errors on the executors, we need to start with very
>>>> small (~10MB) partitions to account for the expansion. Currently this only
>>>> seems possible by explicitly repartitioning the data, incurring the perf
>>>> cost associated with the shuffle. Is it possible to force spark to read
>>>> parquet files into 10MB partitions without explicitly repartitioning?
>>>>
>>>> The documentation regarding Performance Tuning [0] suggests that it
>>>> should be possible to control how spark reads files into partitions - we're
>>>> assuming this accounts for structured streaming jobs as well. Based on our
>>>> understanding of the page, we used the following to configure spark into
>>>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>>>>
>>>> spark.sql.files.openCostInBytes 128MB
>>>> spark.sql.files.maxPartitionBytes 10MB
>>>> spark.sql.files.minPartitionNum 1000
>>>>
>>>> Unfortunately we still see a large number of empty partitions and a
>>>> small number containing the rest of the data (see median vs max number of
>>>> input records).
>>>>
>>>> [image: image.png]
>>>>
>>>> Any help would be much appreciated
>>>>
>>>> Chris
>>>>
>>>
>>>
>>> --
>>> Adam Binford
>>>
>>

Reply via email to