It should just be parquet.block.size indeed.
spark.write.option("parquet.block.size", "16m").parquet(...)
This is an issue in how you write, not read, the parquet.

On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho <chrisbcouti...@gmail.com>
wrote:

> Hi Adam,
>
> Thanks for the explanation on the empty partitions.
>
> We have the freedom to adjust how the source table is written, so if there
> are any improvements we can implement on the source side we'd be happy to
> look into that.
>
> It's not yet clear to me how you can reduce the row group size of the
> parquet files, I see some mention of `parquet.block.size` online , as well
> as various map reduce settings regarding file splitting (SO:
> mapred-min-split-size-in-hdfs
> <https://stackoverflow.com/questions/19188315/behavior-of-the-parameter-mapred-min-split-size-in-hdfs>);
> however, I don't quite understand the link between the splitting settings,
> row group configuration, and resulting number of records when reading from
> a delta table.
>
> For more specifics: we're running Spark 3.1.2 using ADLS as cloud storage.
>
> Best,
> Chris
>
> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford <adam...@gmail.com> wrote:
>
>> The smallest unit of work you can do on a parquet file (under the delta
>> hood) is based on the parquet row group size, which by default is 128mb. If
>> you specify maxPartitionBytes of 10mb, what that will basically do is
>> create a partition for each 10mb of a file, but whatever partition covers
>> the part of the file where the row group starts will consume the entire row
>> group. That's why you're seeing a lot of empty partitions and a small
>> number with the rest of the actual data.
>>
>> Can't think of any solution other than repartitioning (or rewriting the
>> input Delta table with a much smaller row group size which wouldn't be
>> ideal performance wise).
>>
>> Adam
>>
>> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <chrisbcouti...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> We have a spark structured streaming job that includes a stream-static
>>> join and a Pandas UDF, streaming to/from delta tables. The primary key of
>>> the static table is non-unique, meaning that the streaming join results in
>>> multiple records per input record - in our case 100x increase. The Pandas
>>> UDF is then applied to the resulting stream-static join and stored in a
>>> table. To avoid OOM errors on the executors, we need to start with very
>>> small (~10MB) partitions to account for the expansion. Currently this only
>>> seems possible by explicitly repartitioning the data, incurring the perf
>>> cost associated with the shuffle. Is it possible to force spark to read
>>> parquet files into 10MB partitions without explicitly repartitioning?
>>>
>>> The documentation regarding Performance Tuning [0] suggests that it
>>> should be possible to control how spark reads files into partitions - we're
>>> assuming this accounts for structured streaming jobs as well. Based on our
>>> understanding of the page, we used the following to configure spark into
>>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>>>
>>> spark.sql.files.openCostInBytes 128MB
>>> spark.sql.files.maxPartitionBytes 10MB
>>> spark.sql.files.minPartitionNum 1000
>>>
>>> Unfortunately we still see a large number of empty partitions and a
>>> small number containing the rest of the data (see median vs max number of
>>> input records).
>>>
>>> [image: image.png]
>>>
>>> Any help would be much appreciated
>>>
>>> Chris
>>>
>>
>>
>> --
>> Adam Binford
>>
>

Reply via email to