The smallest unit of work you can do on a parquet file (under the delta
hood) is based on the parquet row group size, which by default is 128mb. If
you specify maxPartitionBytes of 10mb, what that will basically do is
create a partition for each 10mb of a file, but whatever partition covers
the part of the file where the row group starts will consume the entire row
group. That's why you're seeing a lot of empty partitions and a small
number with the rest of the actual data.

Can't think of any solution other than repartitioning (or rewriting the
input Delta table with a much smaller row group size which wouldn't be
ideal performance wise).

Adam

On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <chrisbcouti...@gmail.com>
wrote:

> Hello,
>
> We have a spark structured streaming job that includes a stream-static
> join and a Pandas UDF, streaming to/from delta tables. The primary key of
> the static table is non-unique, meaning that the streaming join results in
> multiple records per input record - in our case 100x increase. The Pandas
> UDF is then applied to the resulting stream-static join and stored in a
> table. To avoid OOM errors on the executors, we need to start with very
> small (~10MB) partitions to account for the expansion. Currently this only
> seems possible by explicitly repartitioning the data, incurring the perf
> cost associated with the shuffle. Is it possible to force spark to read
> parquet files into 10MB partitions without explicitly repartitioning?
>
> The documentation regarding Performance Tuning [0] suggests that it should
> be possible to control how spark reads files into partitions - we're
> assuming this accounts for structured streaming jobs as well. Based on our
> understanding of the page, we used the following to configure spark into
> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>
> spark.sql.files.openCostInBytes 128MB
> spark.sql.files.maxPartitionBytes 10MB
> spark.sql.files.minPartitionNum 1000
>
> Unfortunately we still see a large number of empty partitions and a small
> number containing the rest of the data (see median vs max number of input
> records).
>
> [image: image.png]
>
> Any help would be much appreciated
>
> Chris
>


-- 
Adam Binford

Reply via email to