Hi,

just trying to understand the problem before solving it.

1. you mentioned "The primary key of the static table is non-unique". This
appears to be a design flaw to me.

2. you once again mentioned "The Pandas UDF is then applied to the
resulting stream-static join and stored in a table. To avoid OOM errors on
the executors, we need to start with very small (~10MB) partitions to
account for the expansion. Currently this only seems possible by explicitly
repartitioning the data, incurring the perf cost associated with the
shuffle." Should the shuffle not be happening as it is because you are
joining the records?

Another question:
I am not sure of your notation of "keys", when you are joining the table
are you using single column or multiple columns, are you expecting a
cartesian product to happen during the join, or the number of records
exploding will be at max the number of duplicates in the static table?

Obviously I do not clearly understand the problem, therefore all the
suggestions can be wrong, but without over engineering  have you simply
tried to store the data by sorting it on the PK (the one that is non unique
and in the static table) while running VACCUM?

Ofcourse the above solution assumes that volume of data for a particular
key in the static table fits into an executor memory along with the
subsequent operations

Another thing that you might want to enable is Adaptive Query Execution,
and whether it is enabled properly by reading its settings.


Regards,
Gourav Sengupta

On Fri, Feb 11, 2022 at 6:00 PM Adam Binford <adam...@gmail.com> wrote:

> Writing to Delta might not support the write.option method. We set
> spark.hadoop.parquet.block.size in our spark config for writing to Delta.
>
> Adam
>
> On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho <chrisbcouti...@gmail.com>
> wrote:
>
>> I tried re-writing the table with the updated block size but it doesn't
>> appear to have an effect on the row group size.
>>
>> ```pyspark
>> df = spark.read.format("delta").load("/path/to/source1")
>>
>> df.write \
>> .format("delta") \
>> .mode("overwrite") \
>> .options(**{
>>   "parquet.block.size": "1m",
>> }) \
>> .partitionBy("date") \
>> .save("/path/to/source2")
>> ```
>>
>> The files created by this job are about 20m in size. Using
>> `parquet-tools` I can inspect a single file and see the following 12m file
>> contains a single row group - not the expected 12 based on the block size:
>>
>> $ parquet-tools inspect /path/to/source2/date=.../part-0000.parquet
>> ############ file meta data ############
>> created_by: parquet-mr version 1.10.1-databricks9 (build
>> cf6c823f85c3b69d49e1573e48e236148c709e82)
>> num_columns: 19
>> num_rows: 369483
>> num_row_groups: 1
>> format_version: 1.0
>> serialized_size: 6364
>>
>> ############ Columns ############
>> ...
>>
>> Chris
>>
>> On Fri, Feb 11, 2022 at 3:37 PM Sean Owen <sro...@gmail.com> wrote:
>>
>>> It should just be parquet.block.size indeed.
>>> spark.write.option("parquet.block.size", "16m").parquet(...)
>>> This is an issue in how you write, not read, the parquet.
>>>
>>> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho <chrisbcouti...@gmail.com>
>>> wrote:
>>>
>>>> Hi Adam,
>>>>
>>>> Thanks for the explanation on the empty partitions.
>>>>
>>>> We have the freedom to adjust how the source table is written, so if
>>>> there are any improvements we can implement on the source side we'd be
>>>> happy to look into that.
>>>>
>>>> It's not yet clear to me how you can reduce the row group size of the
>>>> parquet files, I see some mention of `parquet.block.size` online , as well
>>>> as various map reduce settings regarding file splitting (SO:
>>>> mapred-min-split-size-in-hdfs
>>>> <https://stackoverflow.com/questions/19188315/behavior-of-the-parameter-mapred-min-split-size-in-hdfs>);
>>>> however, I don't quite understand the link between the splitting settings,
>>>> row group configuration, and resulting number of records when reading from
>>>> a delta table.
>>>>
>>>> For more specifics: we're running Spark 3.1.2 using ADLS as cloud
>>>> storage.
>>>>
>>>> Best,
>>>> Chris
>>>>
>>>> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford <adam...@gmail.com> wrote:
>>>>
>>>>> The smallest unit of work you can do on a parquet file (under the
>>>>> delta hood) is based on the parquet row group size, which by default is
>>>>> 128mb. If you specify maxPartitionBytes of 10mb, what that will basically
>>>>> do is create a partition for each 10mb of a file, but whatever partition
>>>>> covers the part of the file where the row group starts will consume the
>>>>> entire row group. That's why you're seeing a lot of empty partitions and a
>>>>> small number with the rest of the actual data.
>>>>>
>>>>> Can't think of any solution other than repartitioning (or rewriting
>>>>> the input Delta table with a much smaller row group size which wouldn't be
>>>>> ideal performance wise).
>>>>>
>>>>> Adam
>>>>>
>>>>> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <
>>>>> chrisbcouti...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> We have a spark structured streaming job that includes a
>>>>>> stream-static join and a Pandas UDF, streaming to/from delta tables. The
>>>>>> primary key of the static table is non-unique, meaning that the streaming
>>>>>> join results in multiple records per input record - in our case 100x
>>>>>> increase. The Pandas UDF is then applied to the resulting stream-static
>>>>>> join and stored in a table. To avoid OOM errors on the executors, we need
>>>>>> to start with very small (~10MB) partitions to account for the expansion.
>>>>>> Currently this only seems possible by explicitly repartitioning the data,
>>>>>> incurring the perf cost associated with the shuffle. Is it possible to
>>>>>> force spark to read parquet files into 10MB partitions without explicitly
>>>>>> repartitioning?
>>>>>>
>>>>>> The documentation regarding Performance Tuning [0] suggests that it
>>>>>> should be possible to control how spark reads files into partitions - 
>>>>>> we're
>>>>>> assuming this accounts for structured streaming jobs as well. Based on 
>>>>>> our
>>>>>> understanding of the page, we used the following to configure spark into
>>>>>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>>>>>>
>>>>>> spark.sql.files.openCostInBytes 128MB
>>>>>> spark.sql.files.maxPartitionBytes 10MB
>>>>>> spark.sql.files.minPartitionNum 1000
>>>>>>
>>>>>> Unfortunately we still see a large number of empty partitions and a
>>>>>> small number containing the rest of the data (see median vs max number of
>>>>>> input records).
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> Any help would be much appreciated
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Adam Binford
>>>>>
>>>>

Reply via email to