Is this the point you are trying to implement?

I have state data source which enables the state in SS --> Structured
Streaming to be rewritten, which enables repartitioning, schema
evolution, etc via batch query. The writer requires hash partitioning
against group key, with the "desired number of partitions", which is
same as what Spark does read and write against state.

This is now implemented as DSv1, and the requirement is *simply done
by calling repartition with the "desired number".*

```
val fullPathsForKeyColumns = keySchema.map(key => new
Column(s"key.${key.name}"))
data
  .repartition(*newPartitions*, fullPathsForKeyColumns: _*)
  .queryExecution
  .toRdd
  .foreachPartition(
    writeFn(resolvedCpLocation, version, operatorId, storeName,
keySchema, valueSchema,
      storeConf, hadoopConfBroadcast, queryId))

Well Spark will not know the optimum value of newPartitions and you
will need to work out that from SS size.

Is that a correct understanding?

HTH


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 18 Jun 2023 at 10:12, Pengfei Li <lpengfei2...@gmail.com> wrote:

> Hi All,
>
> I'm developing a DataSource on Spark 3.2 to write data to our system,
> and using DataSource V2 API. I want to implement the interface
> RequiresDistributionAndOrdering
> <https://github.com/apache/spark/blob/branch-3.2/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java>
>  to
> set the number of partitions used for write. But I don't know how to
> implement a distribution without shuffle as  RDD.coalesce does. Is there
> any example or advice?
>
> Thank You
> Best Regards
>

Reply via email to