Enrico Minack created SPARK-40559:
-------------------------------------

             Summary: Add applyInArrow to pyspark.sql.GroupedData
                 Key: SPARK-40559
                 URL: https://issues.apache.org/jira/browse/SPARK-40559
             Project: Spark
          Issue Type: New Feature
          Components: PySpark
    Affects Versions: 3.4.0
            Reporter: Enrico Minack


PySpark allows to transform a {{DataFrame}} via Pandas and Arrow API:

{code:python}
def map_arrow(iter: Iterator[pyarrow.RecordBatch]) -> 
Iterator[pyarrow.RecordBatch]:
    return iter

def map_pandas(iter: Iterator[pandas.DataFrame]) -> Iterator[pandas.DataFrame]:
    return iter

df.mapInArrow(map_arrow, schema="...")
df.mapInPandas(map_pandas, schema="...")
{code}

A grouped {{DataFrame}} currently supports only the Pandas API:

{code:python}
def apply_pandas(df: pandas.DataFrame) -> pandas.DataFrame:
    return df

df.groupBy("id").applyInPandas(apply_pandas, schema="...")
{code}

A similar method for the Arrow API would useful, especially given that Arrow is 
used by many other libraries. An Arrow-based method allows to process the 
{{DataFrame}} with *any* Arrow-based API, e.g. Polars:

{code:python}
def apply_polars(df: polars.DataFrame) -> polars.DataFrame:
  return df

def apply_arrow(iter: Iterator[pyarrow.RecordBatch]) -> 
Iterator[pyarrow.RecordBatch]:
  for batch in iter:
    df = polars.from_arrow(pyarrow.Table.from_batches([batch]))
    for b in apply_polars(df).to_arrow().to_batches():
      yield b

df.groupBy("id").applyInArrow(apply_arrow, schema="...")
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to