HyukjinKwon commented on code in PR #44170:
URL: https://github.com/apache/spark/pull/44170#discussion_r1423409469


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -146,16 +175,102 @@ def main(infile: IO, outfile: IO) -> None:
                 message_parameters={"type": "reader", "error": str(e)},
             )
 
-        # Construct a UDTF.
-        class PythonDataSourceReaderUDTF:
-            def __init__(self) -> None:
-                self.ser = CloudPickleSerializer()
+        # Wrap the data source read logic in an mapInArrow UDF.
+        import pyarrow as pa
+
+        # Create input converter.
+        input_fields = [input_schema.fields[0].name]
+        converter = ArrowTableToRowsConversion._create_converter(BinaryType())
+
+        # Create output converter.
+        return_type = schema
+        pa_schema = to_arrow_schema(return_type)
+        column_names = return_type.fieldNames()
+        column_converters = [
+            LocalDataToArrowConversion._create_converter(field.dataType)
+            for field in return_type.fields
+        ]
+
+        def data_source_read_func(iterator: Iterable[pa.RecordBatch]) -> 
Iterable[pa.RecordBatch]:
+            rows = []
+
+            # Get the partition value from the input iterator.
+            for batch in iterator:
+                # There should be only one row/column in the batch.
+                if batch.num_columns != 1 or batch.num_rows != 1:
+                    raise PySparkAssertionError(
+                        "Expected each batch to have exactly 1 column and 1 
row, "
+                        f"but found {batch.num_columns} columns and 
{batch.num_rows} rows."
+                    )
+                columns = [column.to_pylist() for column in batch.columns]
+                values = [converter(columns[0][0])]
+                rows.append(_create_row(fields=input_fields, values=values))
+
+            assert len(rows) == 1 and len(rows[0]) == 1
+
+            # Deserialize the partition value.
+            partition_bytes = rows[0][0]
+            partition = pickleSer.loads(partition_bytes)
+
+            if partition is not None and not isinstance(partition, 
InputPartition):
+                raise PySparkAssertionError(
+                    error_class="PYTHON_DATA_SOURCE_READ_ERROR",
+                    message_parameters={
+                        "expected": "a partition of type 'InputPartition'",
+                        "actual": f"'{type(partition).__name__}'",
+                    },
+                )
+
+            output_iter = reader.read(partition)  # type: ignore[arg-type]
+
+            # Validate the output iterator.
+            if not isinstance(output_iter, Iterator):
+                raise PySparkRuntimeError(
+                    error_class="PYTHON_DATA_SOURCE_READ_INVALID_RETURN_TYPE",
+                    message_parameters={
+                        "type": type(output_iter).__name__,
+                        "name": data_source.name(),
+                        "supported_types": "iterator",
+                    },
+                )
+
+            def batched(iterator: Iterator, n: int) -> Iterator:
+                return iter(functools.partial(lambda it: list(islice(it, n)), 
iterator), [])
+
+            max_batch_size = int(os.environ.get("ARROW_MAX_RECORDS_PER_BATCH", 
"10000"))

Review Comment:
   While this is probably fine for now because the batch size is less likely 
changed often, we should ideally send the configuration through the socket. 
Otherwise, it will create a new Python worker whenever you change this 
configuration instead of reusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to