OK the number of partitions n or more to the point the "optimum" no of
partitions depends on the size of your batch data DF among other things and
the degree of parallelism at the end point where you will be writing to
sink. If you require high parallelism because your tasks are fine grained,
then a high number for n will be beneficial. Otherwise a coarser writes
that n may be smaller.Since you just want to use the API for
simple transformation, then keeping n smaller may help.

In short I don't think there is a magic number for n that fits all
occasions and likely there is no specific mention of optimum number for n
in the docs. It is likely an iterative process for you to determine the
value of the number of partitions that will work  for your
specific workload and that workload can change and you have to adjust n.
Spark GUI should help to get desired balance between parallelism and
resource efficiency.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 18 Jun 2023 at 12:08, Pengfei Li <lpengfei2...@gmail.com> wrote:

> Thanks for you reply. Yes, it's similar to what I want, although I'm using
> batch rather than structured streaming, but I think there is not much
> difference.
> What I want is similar to the property `numPartitions` of JDBC DataSource
> [1]. This is what it describes
> ```
> The maximum number of partitions that can be used for parallelism in table
> reading and writing. This also determines the maximum number of concurrent
> JDBC connections. If the number of partitions to write exceeds this limit,
> we decrease it to this limit by calling coalesce(numPartitions) before
> writing.
> ```
> I want to control the number of partitions using a similar way with `calling
> coalesce(numPartitions) ` because of lower cost of shuffle. The JDBC
> implements it like this [2]
> ```
> df.coalesce(n).rdd.foreachPartition { iterator => savePartition(
>       table, iterator, rddSchema, insertStmt, batchSize, dialect,
> isolationLevel, options)
>     }
> ```
> Is it possible to do that for DataSource V2?
>
> [1] https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
> [2]
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L877
>
> Thank You
> Best Regards
>
> Mich Talebzadeh <mich.talebza...@gmail.com> 于2023年6月18日周日 18:10写道:
>
>> Is this the point you are trying to implement?
>>
>> I have state data source which enables the state in SS --> Structured 
>> Streaming to be rewritten, which enables repartitioning, schema evolution, 
>> etc via batch query. The writer requires hash partitioning against group 
>> key, with the "desired number of partitions", which is same as what Spark 
>> does read and write against state.
>>
>> This is now implemented as DSv1, and the requirement is *simply done by 
>> calling repartition with the "desired number".*
>>
>> ```
>> val fullPathsForKeyColumns = keySchema.map(key => new 
>> Column(s"key.${key.name}"))
>> data
>>   .repartition(*newPartitions*, fullPathsForKeyColumns: _*)
>>   .queryExecution
>>   .toRdd
>>   .foreachPartition(
>>     writeFn(resolvedCpLocation, version, operatorId, storeName, keySchema, 
>> valueSchema,
>>       storeConf, hadoopConfBroadcast, queryId))
>>
>> Well Spark will not know the optimum value of newPartitions and you will 
>> need to work out that from SS size.
>>
>> Is that a correct understanding?
>>
>> HTH
>>
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 18 Jun 2023 at 10:12, Pengfei Li <lpengfei2...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I'm developing a DataSource on Spark 3.2 to write data to our system,
>>> and using DataSource V2 API. I want to implement the interface
>>> RequiresDistributionAndOrdering
>>> <https://github.com/apache/spark/blob/branch-3.2/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java>
>>>  to
>>> set the number of partitions used for write. But I don't know how to
>>> implement a distribution without shuffle as  RDD.coalesce does. Is there
>>> any example or advice?
>>>
>>> Thank You
>>> Best Regards
>>>
>>

Reply via email to