Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19646#discussion_r149210042
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _get_numpy_record_dtypes(self, rec):
    +        """
    +        Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
    +        the dtypes of records so they can be properly loaded into Spark.
    +        :param rec: a numpy record to check dtypes
    +        :return corrected dtypes for a numpy.record or None if no 
correction needed
    +        """
    +        import numpy as np
    +        cur_dtypes = rec.dtype
    +        col_names = cur_dtypes.names
    +        record_type_list = []
    +        has_rec_fix = False
    +        for i in xrange(len(cur_dtypes)):
    +            curr_type = cur_dtypes[i]
    +            # If type is a datetime64 timestamp, convert to microseconds
    +            # NOTE: if dtype is datetime[ns] then np.record.tolist() will 
output values as longs,
    +            # conversion from [us] or lower will lead to py datetime 
objects, see SPARK-22417
    +            if curr_type == np.dtype('datetime64[ns]'):
    +                curr_type = 'datetime64[us]'
    +                has_rec_fix = True
    +            record_type_list.append((str(col_names[i]), curr_type))
    +        return record_type_list if has_rec_fix else None
    +
    +    def _convert_from_pandas(self, pdf, schema):
    +        """
    +         Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
    +         :return tuple of list of records and schema
    +        """
    +        # If no schema supplied by user then get the names of columns only
    +        if schema is None:
    +            schema = [str(x) for x in pdf.columns]
    +
    +        # Convert pandas.DataFrame to list of numpy records
    +        np_records = pdf.to_records(index=False)
    +
    +        # Check if any columns need to be fixed for Spark to infer properly
    +        if len(np_records) > 0:
    +            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    --- End diff --
    
    The dtype for a numpy record is in a different format
    ```
    n [16]: r
    Out[16]: (0, datetime.date(2017, 11, 6), 1509411661000000000L)
    
    In [17]: r.dtype
    Out[17]: dtype((numpy.record, [(u'index', '<i8'), (u'd', 'O'), (u'ts', 
'<M8[ns]')]))
    ```
    so when using `numpy.record.astype()` it has to be specified in the same 
format and include dtypes for all fields.  If we try to do this with pandas 
dtypes from the DataFrame, there might be some differences that could cause 
errors, so I think it's safer to use the dtypes output from numpy and only 
change the timestamp resolution.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to