[ https://issues.apache.org/jira/browse/SPARK-46361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Weichen Xu reassigned SPARK-46361: ---------------------------------- Assignee: Weichen Xu > Add spark dataset chunk read API (python only) > ---------------------------------------------- > > Key: SPARK-46361 > URL: https://issues.apache.org/jira/browse/SPARK-46361 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core > Affects Versions: 4.0.0 > Reporter: Weichen Xu > Assignee: Weichen Xu > Priority: Major > Labels: pull-request-available > > *Design doc:* > h1. > [https://docs.google.com/document/d/1LHzwCjm2SluHkta_08cM3jxFSgfF-niaCZbtIThG-H8/edit#heading=h.cxcvohcybvo2] > > *Proposed API:* > {code:java} > def persist_dataframe_as_chunks(dataframe: DataFrame) -> list[str]: > """ > Persist and materialize the spark dataframe as chunks, each chunk is an > arrow batch. > It tries to persist data to spark worker memory firstly, if memory is not > sufficient, > then it fallbacks to persist spilled data to spark worker local disk. > Return the list of chunk ids. > This function is only available when it is called from spark driver > process. > """ > def read_chunk(chunk_id): > """ > Read chunk by id, return this chunk as an arrow table. > You can call this function from spark driver, spark python UDF python, > descendant process of spark driver, or descendant process of spark python > UDF worker. > """ > def unpersist_chunks(chunk_ids: list[str]) -> None: > """ > Remove chunks by chunk ids. > This function is only available when it is called from spark driver > process. > """{code} > *Motivation:* > (1) > In Ray on spark, we want to support loading Ray data from arbitrary spark > Dataframe with in-memory conversion, > for Ray on spark, Ray datasource read-task runs as child process of Ray > worker node, and in Ray on spark, we launch Ray worker node as child process > of pyspark UDF worker. > So that the above proposed API allows descendent python process of pyspark > UDF worker to read a chunk data of given spark dataframe, based on this, we > can achieve efficient "spark DataFrame" to "Ray dataset" conversion. > (2) > For spark torch distributor, we want to implement an efficient {{Torch > DataLoader}} that loads data from spark dataframe without saving spark > dataframe to cloud storage. This API makes it feasible. This issue has a > similar pattern to use-case (1) > (3) > For petastorm spark dataset converter (see > [https://www.databricks.com/blog/2020/06/16/simplify-data-conversion-from-apache-spark-to-tensorflow-and-pytorch.html]) > , using the added API, we can achieve better performance -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org