Writing to Delta might not support the write.option method. We set
spark.hadoop.parquet.block.size in our spark config for writing to Delta.

Adam

On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho <chrisbcouti...@gmail.com>
wrote:

> I tried re-writing the table with the updated block size but it doesn't
> appear to have an effect on the row group size.
>
> ```pyspark
> df = spark.read.format("delta").load("/path/to/source1")
>
> df.write \
> .format("delta") \
> .mode("overwrite") \
> .options(**{
>   "parquet.block.size": "1m",
> }) \
> .partitionBy("date") \
> .save("/path/to/source2")
> ```
>
> The files created by this job are about 20m in size. Using `parquet-tools`
> I can inspect a single file and see the following 12m file contains a
> single row group - not the expected 12 based on the block size:
>
> $ parquet-tools inspect /path/to/source2/date=.../part-0000.parquet
> ############ file meta data ############
> created_by: parquet-mr version 1.10.1-databricks9 (build
> cf6c823f85c3b69d49e1573e48e236148c709e82)
> num_columns: 19
> num_rows: 369483
> num_row_groups: 1
> format_version: 1.0
> serialized_size: 6364
>
> ############ Columns ############
> ...
>
> Chris
>
> On Fri, Feb 11, 2022 at 3:37 PM Sean Owen <sro...@gmail.com> wrote:
>
>> It should just be parquet.block.size indeed.
>> spark.write.option("parquet.block.size", "16m").parquet(...)
>> This is an issue in how you write, not read, the parquet.
>>
>> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho <chrisbcouti...@gmail.com>
>> wrote:
>>
>>> Hi Adam,
>>>
>>> Thanks for the explanation on the empty partitions.
>>>
>>> We have the freedom to adjust how the source table is written, so if
>>> there are any improvements we can implement on the source side we'd be
>>> happy to look into that.
>>>
>>> It's not yet clear to me how you can reduce the row group size of the
>>> parquet files, I see some mention of `parquet.block.size` online , as well
>>> as various map reduce settings regarding file splitting (SO:
>>> mapred-min-split-size-in-hdfs
>>> <https://stackoverflow.com/questions/19188315/behavior-of-the-parameter-mapred-min-split-size-in-hdfs>);
>>> however, I don't quite understand the link between the splitting settings,
>>> row group configuration, and resulting number of records when reading from
>>> a delta table.
>>>
>>> For more specifics: we're running Spark 3.1.2 using ADLS as cloud
>>> storage.
>>>
>>> Best,
>>> Chris
>>>
>>> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford <adam...@gmail.com> wrote:
>>>
>>>> The smallest unit of work you can do on a parquet file (under the delta
>>>> hood) is based on the parquet row group size, which by default is 128mb. If
>>>> you specify maxPartitionBytes of 10mb, what that will basically do is
>>>> create a partition for each 10mb of a file, but whatever partition covers
>>>> the part of the file where the row group starts will consume the entire row
>>>> group. That's why you're seeing a lot of empty partitions and a small
>>>> number with the rest of the actual data.
>>>>
>>>> Can't think of any solution other than repartitioning (or rewriting the
>>>> input Delta table with a much smaller row group size which wouldn't be
>>>> ideal performance wise).
>>>>
>>>> Adam
>>>>
>>>> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <
>>>> chrisbcouti...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We have a spark structured streaming job that includes a stream-static
>>>>> join and a Pandas UDF, streaming to/from delta tables. The primary key of
>>>>> the static table is non-unique, meaning that the streaming join results in
>>>>> multiple records per input record - in our case 100x increase. The Pandas
>>>>> UDF is then applied to the resulting stream-static join and stored in a
>>>>> table. To avoid OOM errors on the executors, we need to start with very
>>>>> small (~10MB) partitions to account for the expansion. Currently this only
>>>>> seems possible by explicitly repartitioning the data, incurring the perf
>>>>> cost associated with the shuffle. Is it possible to force spark to read
>>>>> parquet files into 10MB partitions without explicitly repartitioning?
>>>>>
>>>>> The documentation regarding Performance Tuning [0] suggests that it
>>>>> should be possible to control how spark reads files into partitions - 
>>>>> we're
>>>>> assuming this accounts for structured streaming jobs as well. Based on our
>>>>> understanding of the page, we used the following to configure spark into
>>>>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>>>>>
>>>>> spark.sql.files.openCostInBytes 128MB
>>>>> spark.sql.files.maxPartitionBytes 10MB
>>>>> spark.sql.files.minPartitionNum 1000
>>>>>
>>>>> Unfortunately we still see a large number of empty partitions and a
>>>>> small number containing the rest of the data (see median vs max number of
>>>>> input records).
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> Any help would be much appreciated
>>>>>
>>>>> Chris
>>>>>
>>>>
>>>>
>>>> --
>>>> Adam Binford
>>>>
>>>

Reply via email to