GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/22795

    [SPARK-25798][PYTHON] Internally document type conversion between Pandas 
data and SQL types in Pandas UDFs

    ## What changes were proposed in this pull request?
    
    We are facing some problems about type conversions between Pandas data and 
SQL types in Pandas UDFs.
    It's even difficult to identify the problems (see #20163 and #22610).
    
    This PR targets to internally document the type conversion table. Some of 
them looks buggy and we should fix them.
    
    Table can be generated via the codes below:
    
    ```python
    from pyspark.sql.types import *
    from pyspark.sql.functions import pandas_udf
    
    columns = [
        ('none', 'object(NoneType)'),
        ('bool', 'bool'),
        ('int8', 'int8'),
        ('int16', 'int16'),
        ('int32', 'int32'),
        ('int64', 'int64'),
        ('uint8', 'uint8'),
        ('uint16', 'uint16'),
        ('uint32', 'uint32'),
        ('uint64', 'uint64'),
        ('float64', 'float16'),
        ('float64', 'float32'),
        ('float64', 'float64'),
        ('date', 'datetime64[ns]'),
        ('tz_aware_dates', 'datetime64[ns, US/Eastern]'),
        ('string', 'object(string)'),
        ('decimal', 'object(Decimal)'),
        ('array', 'object(array[int32])'),
        ('float128', 'float128'),
        ('complex64', 'complex64'),
        ('complex128', 'complex128'),
        ('category', 'category'),
        ('tdeltas', 'timedelta64[ns]'),
    ]
    
    def create_dataframe():
        import pandas as pd
        import numpy as np
        import decimal
        pdf = pd.DataFrame({
            'none': [None, None],
            'bool': [True, False],
            'int8': np.arange(1, 3).astype('int8'),
            'int16': np.arange(1, 3).astype('int16'),
            'int32': np.arange(1, 3).astype('int32'),
            'int64': np.arange(1, 3).astype('int64'),
            'uint8': np.arange(1, 3).astype('uint8'),
            'uint16': np.arange(1, 3).astype('uint16'),
            'uint32': np.arange(1, 3).astype('uint32'),
            'uint64': np.arange(1, 3).astype('uint64'),
            'float16': np.arange(1, 3).astype('float16'),
            'float32': np.arange(1, 3).astype('float32'),
            'float64': np.arange(1, 3).astype('float64'),
            'float128': np.arange(1, 3).astype('float128'),
            'complex64': np.arange(1, 3).astype('complex64'),
            'complex128': np.arange(1, 3).astype('complex128'),
            'string': list('ab'),
            'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), 
np.array([1, 2, 3], dtype=np.int32)]),
            'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]),
            'date': pd.date_range('19700101', periods=2).values,
            'category': pd.Series(list("AB")).astype('category')})
        pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]]
        pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, 
tz='US/Eastern')
        return pdf
    
    types =  [
        BooleanType(),
        ByteType(),
        ShortType(),
        IntegerType(),
        LongType(),
        FloatType(),
        DoubleType(),
        DateType(),
        TimestampType(),
        StringType(),
        DecimalType(10, 0),
        ArrayType(IntegerType()),
        MapType(StringType(), IntegerType()),
        StructType([StructField("_1", IntegerType())]),
        BinaryType(),
    ]
    
    df = spark.range(2).repartition(1)
    results = []
    count = 0
    total = len(types) * len(columns)
    values = []
    spark.sparkContext.setLogLevel("FATAL")
    for t in types:
        result = []
        for column, pandas_t in columns:
            v = create_dataframe()[column][0]
            values.append(v)
            try:
                row = df.select(pandas_udf(lambda _: 
create_dataframe()[column], t)(df.id)).first()
                ret_str = repr(row[0])
            except Exception:
                ret_str = "X"
            result.append(ret_str)
            progress = "SQL Type: [%s]\n  Pandas Value(Type): %s(%s)]\n  Result 
Python Value: [%s]" % (
                t.simpleString(), v, pandas_t, ret_str)
            count += 1
            print("%s/%s:\n  %s" % (count, total, progress))
        results.append([t.simpleString()] + list(map(str, result)))
    
    
    schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda 
values_column: "%s(%s)" % (values_column[0], values_column[1][1]), zip(values, 
columns)))
    strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 
20, False)
    print("\n".join(map(lambda line: "    # %s  # noqa" % line, 
strings.strip().split("\n"))))
    
    ```
    
    This code is compatible with both Python 2 and 3 but the table was 
generated under Python 2.
    
    ## How was this patch tested?
    
    Manually tested and lint check.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-25798

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22795.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22795
    
----
commit b361c20473cd4276fbbee6e27ff26bf71539762f
Author: hyukjinkwon <gurwls223@...>
Date:   2018-10-22T10:47:38Z

    Internally document type conversion between Pandas data and SQL types in 
Pandas UDFs

commit 53561240bf5fed6a52e6ffef4f81e7128524365a
Author: hyukjinkwon <gurwls223@...>
Date:   2018-10-22T14:01:38Z

    Let's just list up everything

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to