[ 
https://issues.apache.org/jira/browse/SPARK-55159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-55159:
-----------------------------------
    Labels: pull-request-available  (was: )

> Extract Arrow batch transformers from serializers for better composability
> --------------------------------------------------------------------------
>
>                 Key: SPARK-55159
>                 URL: https://issues.apache.org/jira/browse/SPARK-55159
>             Project: Spark
>          Issue Type: Umbrella
>          Components: PySpark
>    Affects Versions: 4.2.0
>            Reporter: Yicong Huang
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, PySpark's Arrow serializers (e.g., {{ArrowStreamUDFSerializer}}, 
> {{ArrowStreamPandasSerializer}}) mix two concerns:
> 1. *Serialization*: Reading/writing Arrow IPC streams
> 2. *Data transformation*: Flattening structs, wrapping columns, converting to 
> pandas, etc.
> *Proposed approach (3 phases):*
> *Phase 1: Extract transformers to conversion.py*
> Extract transformation logic into {{ArrowBatchTransformer}} class with static 
> methods in {{pyspark.sql.conversion}}. Serializers call these transformers 
> internally.
> {code:python}
> class ArrowBatchTransformer:
>     @staticmethod
>     def flatten_struct(batch: pa.RecordBatch) -> pa.RecordBatch:
>         """Flatten a single struct column into a RecordBatch."""
>         struct = batch.column(0)
>         return pa.RecordBatch.from_arrays(struct.flatten(), 
> schema=pa.schema(struct.type))
>     @staticmethod
>     def wrap_struct(batch: pa.RecordBatch) -> pa.RecordBatch:
>         """Wrap a RecordBatch's columns into a single struct column."""
>         if batch.num_columns == 0:
>             struct = pa.array([{}] * batch.num_rows)
>         else:
>             struct = pa.StructArray.from_arrays(batch.columns, 
> fields=pa.struct(list(batch.schema)))
>         return pa.RecordBatch.from_arrays([struct], ["_0"])
> {code}
> Serializers use these via {{map()}}:
> {code:python}
> class ArrowStreamUDFSerializer(ArrowStreamSerializer):
>     def load_stream(self, stream):
>         batches = super().load_stream(stream)
>         return map(list, map(ArrowBatchTransformer.flatten_struct, batches))
>     def dump_stream(self, iterator, stream):
>         batches = map(lambda x: ArrowBatchTransformer.wrap_struct(x[0]), 
> iterator)
>         ...
> {code}
> *Phase 2: Reduce serializer complexity*
> - Reduce inheritance depth in serializer hierarchy
> - Simplify serializer implementations using extracted transformers
> - Remove duplicated transformation logic across serializers
> *Phase 3: Make transformers usable outside serializers*
> - Enable direct use of transformers for custom Arrow processing pipelines
> - Support chaining transformers for complex transformations
> *Benefits:*
> - Clear separation of concerns (serialization vs transformation)
> - Transformers are reusable and testable in isolation
> - Easier to understand data flow as a pipeline
> - Transformers have no side effects (I/O stays in serializers)
> *Design principles:*
> - Transformers: Pure functions {{RecordBatch -> RecordBatch}}, no side effects
> - Serializers: Handle I/O, protocol details (e.g., START_ARROW_STREAM marker)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to