zhengruifeng commented on code in PR #44170:
URL: https://github.com/apache/spark/pull/44170#discussion_r1427498004


##########
python/pyspark/sql/worker/plan_data_source_read.py:
##########
@@ -146,16 +176,94 @@ def main(infile: IO, outfile: IO) -> None:
                 message_parameters={"type": "reader", "error": str(e)},
             )
 
-        # Construct a UDTF.
-        class PythonDataSourceReaderUDTF:
-            def __init__(self) -> None:
-                self.ser = CloudPickleSerializer()
+        # Wrap the data source read logic in an mapInArrow UDF.
+        import pyarrow as pa
+
+        # Create input converter.
+        converter = ArrowTableToRowsConversion._create_converter(BinaryType())
+
+        # Create output converter.
+        return_type = schema
+        pa_schema = to_arrow_schema(return_type)
+        column_names = return_type.fieldNames()
+        column_converters = [
+            LocalDataToArrowConversion._create_converter(field.dataType)

Review Comment:
   since `LocalDataToArrowConversion` and `ArrowTableToRowsConversion` were not 
designed for data source api, so I think we should look into it to make sure 
the behavior is as expected, e.g.
   1, there is a `_deduplicate_field_names` logic introduced in 
https://github.com/apache/spark/commit/71bac156b0df2ce9e56c1c1690806a4b40ffd615,
 no sure whether is should be used in data source;
   2, IIRC, the internally used `to_arrow_schema` doesn't support all SQL types



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to