This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fe50fcdb6a [SPARK-44766][PYTHON] Cache the pandas converter for reuse 
for Python UDTFs
8fe50fcdb6a is described below

commit 8fe50fcdb6a34b06c07c235f497b77cc5e245877
Author: allisonwang-db <allison.w...@databricks.com>
AuthorDate: Fri Aug 11 10:23:01 2023 +0900

    [SPARK-44766][PYTHON] Cache the pandas converter for reuse for Python UDTFs
    
    ### What changes were proposed in this pull request?
    
    This PR caches the pandas converter for reuse when serializing the results 
from arrow-optimized Python UDTFs.
    
    ### Why are the changes needed?
    
    To improve the performance
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    Closes #42439 from allisonwang-db/spark-44766-cache-converter.
    
    Authored-by: allisonwang-db <allison.w...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/pandas/serializers.py | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index d1a3babb1fd..2cc3db15c9c 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -499,6 +499,7 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
             # Enables explicit casting for mismatched return types of Arrow 
Python UDTFs.
             arrow_cast=True,
         )
+        self._converter_map = dict()
 
     def _create_batch(self, series):
         """
@@ -538,6 +539,17 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 
         return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
 
+    def _get_or_create_converter_from_pandas(self, dt):
+        if dt not in self._converter_map:
+            conv = _create_converter_from_pandas(
+                dt,
+                timezone=self._timezone,
+                error_on_duplicated_field_names=False,
+                ignore_unexpected_complex_type_values=True,
+            )
+            self._converter_map[dt] = conv
+        return self._converter_map[dt]
+
     def _create_array(self, series, arrow_type, spark_type=None, 
arrow_cast=False):
         """
         Override the `_create_array` method in the superclass to create an 
Arrow Array
@@ -569,13 +581,7 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 
         if arrow_type is not None:
             dt = spark_type or from_arrow_type(arrow_type, 
prefer_timestamp_ntz=True)
-            # TODO(SPARK-43579): cache the converter for reuse
-            conv = _create_converter_from_pandas(
-                dt,
-                timezone=self._timezone,
-                error_on_duplicated_field_names=False,
-                ignore_unexpected_complex_type_values=True,
-            )
+            conv = self._get_or_create_converter_from_pandas(dt)
             series = conv(series)
 
         if hasattr(series.array, "__arrow_array__"):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to