Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149210042 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema + def _get_numpy_record_dtypes(self, rec): + """ + Used when converting a pandas.DataFrame to Spark using to_records(), this will correct + the dtypes of records so they can be properly loaded into Spark. + :param rec: a numpy record to check dtypes + :return corrected dtypes for a numpy.record or None if no correction needed + """ + import numpy as np + cur_dtypes = rec.dtype + col_names = cur_dtypes.names + record_type_list = [] + has_rec_fix = False + for i in xrange(len(cur_dtypes)): + curr_type = cur_dtypes[i] + # If type is a datetime64 timestamp, convert to microseconds + # NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs, + # conversion from [us] or lower will lead to py datetime objects, see SPARK-22417 + if curr_type == np.dtype('datetime64[ns]'): + curr_type = 'datetime64[us]' + has_rec_fix = True + record_type_list.append((str(col_names[i]), curr_type)) + return record_type_list if has_rec_fix else None + + def _convert_from_pandas(self, pdf, schema): + """ + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema + """ + # If no schema supplied by user then get the names of columns only + if schema is None: + schema = [str(x) for x in pdf.columns] + + # Convert pandas.DataFrame to list of numpy records + np_records = pdf.to_records(index=False) + + # Check if any columns need to be fixed for Spark to infer properly + if len(np_records) > 0: + record_type_list = self._get_numpy_record_dtypes(np_records[0]) --- End diff -- The dtype for a numpy record is in a different format ``` n [16]: r Out[16]: (0, datetime.date(2017, 11, 6), 1509411661000000000L) In [17]: r.dtype Out[17]: dtype((numpy.record, [(u'index', '<i8'), (u'd', 'O'), (u'ts', '<M8[ns]')])) ``` so when using `numpy.record.astype()` it has to be specified in the same format and include dtypes for all fields. If we try to do this with pandas dtypes from the DataFrame, there might be some differences that could cause errors, so I think it's safer to use the dtypes output from numpy and only change the timestamp resolution.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org