Hello,

We have a spark structured streaming job that includes a stream-static join
and a Pandas UDF, streaming to/from delta tables. The primary key of the
static table is non-unique, meaning that the streaming join results in
multiple records per input record - in our case 100x increase. The Pandas
UDF is then applied to the resulting stream-static join and stored in a
table. To avoid OOM errors on the executors, we need to start with very
small (~10MB) partitions to account for the expansion. Currently this only
seems possible by explicitly repartitioning the data, incurring the perf
cost associated with the shuffle. Is it possible to force spark to read
parquet files into 10MB partitions without explicitly repartitioning?

The documentation regarding Performance Tuning [0] suggests that it should
be possible to control how spark reads files into partitions - we're
assuming this accounts for structured streaming jobs as well. Based on our
understanding of the page, we used the following to configure spark into
reading a stream of 10GB per trigger into 1000 partitions 10 MB each.

spark.sql.files.openCostInBytes 128MB
spark.sql.files.maxPartitionBytes 10MB
spark.sql.files.minPartitionNum 1000

Unfortunately we still see a large number of empty partitions and a small
number containing the rest of the data (see median vs max number of input
records).

[image: image.png]

Any help would be much appreciated

Chris

Reply via email to