[ 
https://issues.apache.org/jira/browse/SPARK-37227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-37227:
---------------------------------
    Description: 
h2. Background

*Usability problem*

In Scala APIs, we have added Arrow integration and developer APIs in Apache 
Spark, for example, at ArrowConverters but this is again inconvenient to use 
out of the box.

In PySpark, in order to use Arrow format to connect to other external systems, 
they should manually convert pandas DataFrame in pandas UDF to the Arrow batch, 
which is inconvenient.


*Technical problem*

pandas UDFs are used in other use cases too. For example, they convert it back 
to
Arrow batch, and integrate with other systems, see also 
https://github.com/apache/spark/pull/26783#issue-534127514.
However, it doesn’t vectorize operations because pandas doesn’t support nested 
structure
natively, and the performance impact seems non-trivial.

In addition, it requires virtually copying during the conversion between pandas 
and Arrow format that consumes computation (Spark internal format -> Arrow 
format -> pandas DataFrame -> Arrow format). See 
https://github.com/apache/spark/pull/26783#issue-534127514 for performance 
impact.


h2. Other notes:

See also:
- SPARK-30153
- SPARK-26413

h2. Proposal

I would like to propose an API DataFrame.mapInArrow like 
{{DataFrame.mapInPandas}}, and {{RDD.mapPartitions}}.

The API shape would look like:

*Scala:*

{code}
def mapInArrow(
    f: Iterator[ArrowRecordBatch] => Iterator[ArrowRecordBatch],
    schema: StructType): DataFrame = {
  // ...
}
{code}

{code}
df.mapInArrow(_.map { case arrowBatch: ArrowRecordBatch =>
  // do something with `ArrowRecordBatch` and create new `ArrowRecordBatch`.
  // ...
  arrowBatch
}, df.schema).show()
{code}


*Python:*

{code}
def mapInArrow(
        self,
        func: Callable[Iterator[pyarrow.RecordBatch], 
Iterator[pyarrow.RecordBatch]],
        schema: StructType) -> DataFrame:
    # ...
{code}

{code}
def do_something(iterator):
    for arrow_batch in iterator:
        # do something with `pyarrow.RecordBatch` and create new 
`pyarrow.RecordBatch`.
        # ...
        yield arrow_batch

df.mapInPandas(do_something, df.schema).show()
{code}




  was:
h2. Background

*Usability problem*

In Scala APIs, we have added Arrow integration and developer APIs in Apache 
Spark, for example, at ArrowConverters but this is again inconvenient to use 
out of the box.

In PySpark, in order to use Arrow format to connect to other external systems, 
they should manually convert pandas DataFrame in pandas UDF to the Arrow batch, 
which is inconvenient.


*Technical problem*

pandas UDFs are used in other use cases too. For example, they convert it back 
to
Arrow batch, and integrate with other systems, see also 
https://github.com/apache/spark/pull/26783#issue-534127514.
However, it doesn’t vectorize operations because pandas doesn’t support nested 
structure
natively, and the performance impact seems non-trivial.

In addition, it requires virtually copying during the conversion between pandas 
and Arrow format that consumes computation (Spark internal format -> Arrow 
format -> pandas DataFrame -> Arrow format). See 
https://github.com/apache/spark/pull/26783#issue-534127514 for performance 
impact.


h2. Other notes:

See also:
- SPARK-30153
- SPARK-26413

h2. Proposal

I would like to propose an API DataFrame.mapInArrow like 
{{DataFrame.mapInPandas}}, and {{RDD.mapPartitions}}.

The API shape would look like:

*Scala:*

{code}
def mapInArrow(
    f: Iterator[ArrowRecordBatch] => Iterator[ArrowRecordBatch],
    schema: StructType): DataFrame = {
  // ...
}
df.mapInArrow(_.map { case arrowBatch: ArrowRecordBatch =>
  // do something with `ArrowRecordBatch` and create new `ArrowRecordBatch`.
  // ...
  arrowBatch
}, df.schema).show()
{code}


*Python:*

{code}
def mapInArrow(
        self,
        func: Callable[Iterator[pyarrow.RecordBatch], 
Iterator[pyarrow.RecordBatch]],
        schema: StructType) -> DataFrame:
    # ...
def do_something(iterator):
    for arrow_batch in iterator:
        # do something with `pyarrow.RecordBatch` and create new 
`pyarrow.RecordBatch`.
        # ...
        yield arrow_batch

df.mapInPandas(do_something, df.schema).show()
{code}





> DataFrame.mapInArrow
> --------------------
>
>                 Key: SPARK-37227
>                 URL: https://issues.apache.org/jira/browse/SPARK-37227
>             Project: Spark
>          Issue Type: Umbrella
>          Components: PySpark, SQL
>    Affects Versions: 3.3.0
>            Reporter: Hyukjin Kwon
>            Priority: Major
>
> h2. Background
> *Usability problem*
> In Scala APIs, we have added Arrow integration and developer APIs in Apache 
> Spark, for example, at ArrowConverters but this is again inconvenient to use 
> out of the box.
> In PySpark, in order to use Arrow format to connect to other external 
> systems, they should manually convert pandas DataFrame in pandas UDF to the 
> Arrow batch, which is inconvenient.
> *Technical problem*
> pandas UDFs are used in other use cases too. For example, they convert it 
> back to
> Arrow batch, and integrate with other systems, see also 
> https://github.com/apache/spark/pull/26783#issue-534127514.
> However, it doesn’t vectorize operations because pandas doesn’t support 
> nested structure
> natively, and the performance impact seems non-trivial.
> In addition, it requires virtually copying during the conversion between 
> pandas and Arrow format that consumes computation (Spark internal format -> 
> Arrow format -> pandas DataFrame -> Arrow format). See 
> https://github.com/apache/spark/pull/26783#issue-534127514 for performance 
> impact.
> h2. Other notes:
> See also:
> - SPARK-30153
> - SPARK-26413
> h2. Proposal
> I would like to propose an API DataFrame.mapInArrow like 
> {{DataFrame.mapInPandas}}, and {{RDD.mapPartitions}}.
> The API shape would look like:
> *Scala:*
> {code}
> def mapInArrow(
>     f: Iterator[ArrowRecordBatch] => Iterator[ArrowRecordBatch],
>     schema: StructType): DataFrame = {
>   // ...
> }
> {code}
> {code}
> df.mapInArrow(_.map { case arrowBatch: ArrowRecordBatch =>
>   // do something with `ArrowRecordBatch` and create new `ArrowRecordBatch`.
>   // ...
>   arrowBatch
> }, df.schema).show()
> {code}
> *Python:*
> {code}
> def mapInArrow(
>         self,
>         func: Callable[Iterator[pyarrow.RecordBatch], 
> Iterator[pyarrow.RecordBatch]],
>         schema: StructType) -> DataFrame:
>     # ...
> {code}
> {code}
> def do_something(iterator):
>     for arrow_batch in iterator:
>         # do something with `pyarrow.RecordBatch` and create new 
> `pyarrow.RecordBatch`.
>         # ...
>         yield arrow_batch
> df.mapInPandas(do_something, df.schema).show()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to