It should just be parquet.block.size indeed. spark.write.option("parquet.block.size", "16m").parquet(...) This is an issue in how you write, not read, the parquet.
On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho <chrisbcouti...@gmail.com> wrote: > Hi Adam, > > Thanks for the explanation on the empty partitions. > > We have the freedom to adjust how the source table is written, so if there > are any improvements we can implement on the source side we'd be happy to > look into that. > > It's not yet clear to me how you can reduce the row group size of the > parquet files, I see some mention of `parquet.block.size` online , as well > as various map reduce settings regarding file splitting (SO: > mapred-min-split-size-in-hdfs > <https://stackoverflow.com/questions/19188315/behavior-of-the-parameter-mapred-min-split-size-in-hdfs>); > however, I don't quite understand the link between the splitting settings, > row group configuration, and resulting number of records when reading from > a delta table. > > For more specifics: we're running Spark 3.1.2 using ADLS as cloud storage. > > Best, > Chris > > On Fri, Feb 11, 2022 at 1:40 PM Adam Binford <adam...@gmail.com> wrote: > >> The smallest unit of work you can do on a parquet file (under the delta >> hood) is based on the parquet row group size, which by default is 128mb. If >> you specify maxPartitionBytes of 10mb, what that will basically do is >> create a partition for each 10mb of a file, but whatever partition covers >> the part of the file where the row group starts will consume the entire row >> group. That's why you're seeing a lot of empty partitions and a small >> number with the rest of the actual data. >> >> Can't think of any solution other than repartitioning (or rewriting the >> input Delta table with a much smaller row group size which wouldn't be >> ideal performance wise). >> >> Adam >> >> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <chrisbcouti...@gmail.com> >> wrote: >> >>> Hello, >>> >>> We have a spark structured streaming job that includes a stream-static >>> join and a Pandas UDF, streaming to/from delta tables. The primary key of >>> the static table is non-unique, meaning that the streaming join results in >>> multiple records per input record - in our case 100x increase. The Pandas >>> UDF is then applied to the resulting stream-static join and stored in a >>> table. To avoid OOM errors on the executors, we need to start with very >>> small (~10MB) partitions to account for the expansion. Currently this only >>> seems possible by explicitly repartitioning the data, incurring the perf >>> cost associated with the shuffle. Is it possible to force spark to read >>> parquet files into 10MB partitions without explicitly repartitioning? >>> >>> The documentation regarding Performance Tuning [0] suggests that it >>> should be possible to control how spark reads files into partitions - we're >>> assuming this accounts for structured streaming jobs as well. Based on our >>> understanding of the page, we used the following to configure spark into >>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each. >>> >>> spark.sql.files.openCostInBytes 128MB >>> spark.sql.files.maxPartitionBytes 10MB >>> spark.sql.files.minPartitionNum 1000 >>> >>> Unfortunately we still see a large number of empty partitions and a >>> small number containing the rest of the data (see median vs max number of >>> input records). >>> >>> [image: image.png] >>> >>> Any help would be much appreciated >>> >>> Chris >>> >> >> >> -- >> Adam Binford >> >