[ 
https://issues.apache.org/jira/browse/SPARK-46361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichen Xu reassigned SPARK-46361:
----------------------------------

    Assignee: Weichen Xu

> Add spark dataset chunk read API (python only)
> ----------------------------------------------
>
>                 Key: SPARK-46361
>                 URL: https://issues.apache.org/jira/browse/SPARK-46361
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, Spark Core
>    Affects Versions: 4.0.0
>            Reporter: Weichen Xu
>            Assignee: Weichen Xu
>            Priority: Major
>              Labels: pull-request-available
>
> *Design doc:*
> h1. 
> [https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit#heading=h.cxcvohcybvo2]
>  
> *Proposed API:*
> {code:java}
> def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]:
>     """
>     Persist and materialize the spark dataframe as chunks, each chunk is an 
> arrow batch.
>     It tries to persist data to spark worker memory firstly, if memory is not 
> sufficient,
>     then it fallbacks to persist spilled data to spark worker local disk.
>     Return the list of chunk ids.
>     This function is only available when it is called from spark driver 
> process.
>     """
> def read_chunk(chunk_id):
>     """
>     Read chunk by id, return this chunk as an arrow table.
>     You can call this function from spark driver, spark python UDF python,
>     descendant process of spark driver, or descendant process of spark python 
> UDF worker.
>     """
> def unpersist_chunks(chunk_ids: list[str]) -> None:
>     """
>     Remove chunks by chunk ids.
>     This function is only available when it is called from spark driver 
> process.
>     """{code}
> *Motivation:*
> (1)
> In Ray on spark, we want to support loading Ray data from arbitrary spark 
> Dataframe with in-memory conversion,
> for Ray on spark, Ray datasource read-task runs as child process of Ray 
> worker node, and in Ray on spark, we launch Ray worker node as child process 
> of pyspark UDF worker.
> So that the above proposed API allows descendent python process of pyspark 
> UDF worker to read a chunk data of given spark dataframe, based on this, we 
> can achieve efficient "spark DataFrame" to "Ray dataset" conversion.
> (2)
> For spark torch distributor, we want to implement an efficient {{Torch 
> DataLoader}} that loads data from spark dataframe without saving spark 
> dataframe to cloud storage. This API makes it feasible. This issue has a 
> similar pattern to use-case (1)
> (3)
> For petastorm spark dataset converter (see 
> [https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html])
>  , using the added API, we can achieve better performance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to