[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149867086 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") --- End diff -- Ah, in that case, maybe we need to revert one of the two original patches and fix one by one, or merge the two follow-ups into one as a hot-fix pr. cc @gatorsmile @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149866007 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") --- End diff -- BTW, @ueshin . `branch-2.2` Jenkins will fail due to #19701 . Could you merge #19701 first? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149865875 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") --- End diff -- Great, @ueshin ! :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149865798 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") --- End diff -- Thank you, @BryanCutler ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149865739 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") --- End diff -- I can take it over. I'll submit a pr soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149863519 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") --- End diff -- I can add a patch a little bit later tonight unless someone is able to get to it first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149863279 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") --- End diff -- Thanks @dongjoon-hyun for tracking this down. It looks like sql/tests.py for branch-2.2 is just missing the following ``` _have_pandas = False try: import pandas _have_pandas = True except: # No Pandas, but that's okay, we'll skip those tests pass ``` This was probably adding from an earlier PR in master and wasn't included when this was cherry-picked. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149860091 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") --- End diff -- Hi, @cloud-fan and @BryanCutler . This seems to break `branch-2.2`, but has been hidden behind another SQL error. (cc @gatorsmile , @henryr) Please see [this](https://github.com/apache/spark/pull/19701#issuecomment-343037369). cc @felixcheung since he is RM for 2.2.1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19646 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149491411 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _get_numpy_record_dtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs, +# conversion from [us] or lower will lead to py datetime objects, see SPARK-22417 +if curr_type == np.dtype('datetime64[ns]'): +curr_type = 'datetime64[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convert_from_pandas(self, pdf, schema): --- End diff -- yes, it isn't used in the conversion any more, but I think it should still be here for the case when it's None and then assigned to a list of the pdf column names. That way we can keep all pandas related code in this method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149490465 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _get_numpy_record_dtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs, +# conversion from [us] or lower will lead to py datetime objects, see SPARK-22417 +if curr_type == np.dtype('datetime64[ns]'): +curr_type = 'datetime64[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convert_from_pandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# If no schema supplied by user then get the names of columns only +if schema is None: +schema = [str(x) for x in pdf.columns] + +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +if len(np_records) > 0: +record_type_list = self._get_numpy_record_dtypes(np_records[0]) +if record_type_list is not None: +return [r.astype(record_type_list).tolist() for r in np_records], schema --- End diff -- I don't think this will increase performance, we will still have to iterate over each record and convert to a list in addition to making a copy of the timestamp data. Another issue is that using `DataFrame.astype` will truncate the resolution to microseconds, but Pandas will continue to store as `datetime64[ns]`. see https://stackoverflow.com/a/32827472 This means that we have to change the conversion routine to separate all columns in the DataFrame and manually convert to rows of records instead of using `to_records()` and `tolist()`. I think it would be best to keep the casting on the numpy side, it's safer and keeps things simpler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149283762 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _get_numpy_record_dtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs, +# conversion from [us] or lower will lead to py datetime objects, see SPARK-22417 +if curr_type == np.dtype('datetime64[ns]'): +curr_type = 'datetime64[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convert_from_pandas(self, pdf, schema): --- End diff -- I guess we can remove `schema` parameter from here because the `schema` doesn't affect the conversion now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149212192 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _get_numpy_record_dtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs, +# conversion from [us] or lower will lead to py datetime objects, see SPARK-22417 +if curr_type == np.dtype('datetime64[ns]'): +curr_type = 'datetime64[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convert_from_pandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# If no schema supplied by user then get the names of columns only +if schema is None: +schema = [str(x) for x in pdf.columns] + +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +if len(np_records) > 0: +record_type_list = self._get_numpy_record_dtypes(np_records[0]) +if record_type_list is not None: +return [r.astype(record_type_list).tolist() for r in np_records], schema --- End diff -- ok let's copy it. Is it a valid idea to use `DataFrame.astype`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149211050 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _get_numpy_record_dtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs, +# conversion from [us] or lower will lead to py datetime objects, see SPARK-22417 +if curr_type == np.dtype('datetime64[ns]'): +curr_type = 'datetime64[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convert_from_pandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# If no schema supplied by user then get the names of columns only +if schema is None: +schema = [str(x) for x in pdf.columns] + +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +if len(np_records) > 0: +record_type_list = self._get_numpy_record_dtypes(np_records[0]) +if record_type_list is not None: +return [r.astype(record_type_list).tolist() for r in np_records], schema --- End diff -- Then that would modify the input pandas.DataFrame from the user, which would be bad if they use it after this call. Making a copy of the DataFrame might not be good either if it is large. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149210042 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _get_numpy_record_dtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs, +# conversion from [us] or lower will lead to py datetime objects, see SPARK-22417 +if curr_type == np.dtype('datetime64[ns]'): +curr_type = 'datetime64[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convert_from_pandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# If no schema supplied by user then get the names of columns only +if schema is None: +schema = [str(x) for x in pdf.columns] + +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +if len(np_records) > 0: +record_type_list = self._get_numpy_record_dtypes(np_records[0]) --- End diff -- The dtype for a numpy record is in a different format ``` n [16]: r Out[16]: (0, datetime.date(2017, 11, 6), 15094116610L) In [17]: r.dtype Out[17]: dtype((numpy.record, [(u'index', '
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149207295 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _get_numpy_record_dtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs, +# conversion from [us] or lower will lead to py datetime objects, see SPARK-22417 +if curr_type == np.dtype('datetime64[ns]'): +curr_type = 'datetime64[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convert_from_pandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# If no schema supplied by user then get the names of columns only +if schema is None: +schema = [str(x) for x in pdf.columns] + +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +if len(np_records) > 0: +record_type_list = self._get_numpy_record_dtypes(np_records[0]) +if record_type_list is not None: +return [r.astype(record_type_list).tolist() for r in np_records], schema --- End diff -- instead of doing this, we should call `DataFrame.astype`, which accepts a python dict. Then we can create a dict that maps column name to corrected dtype(only include columns need to cast). We can also specify (copy=False) for better performance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149206178 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _get_numpy_record_dtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs, +# conversion from [us] or lower will lead to py datetime objects, see SPARK-22417 +if curr_type == np.dtype('datetime64[ns]'): +curr_type = 'datetime64[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convert_from_pandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# If no schema supplied by user then get the names of columns only +if schema is None: +schema = [str(x) for x in pdf.columns] + +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +if len(np_records) > 0: +record_type_list = self._get_numpy_record_dtypes(np_records[0]) --- End diff -- can't we just use `pdf.dtype`? ``` >>> pdf.dtypes[0] dtype('int64') >>> pdf.to_records(index=False)[0].dtype[0] dtype('int64') ``` I think they are same --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149196424 --- Diff: python/pyspark/sql/session.py --- @@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): -if schema is None: -schema = [str(x) for x in data.columns] -data = [r.tolist() for r in data.to_records(index=False)] --- End diff -- the problem is that nanosecond values can not be converted to a python datetime object, which only has microsecond resolution, so numpy converts it to long. Numpy will convert microseconds and above to python datetime objects, which Spark will correctly infer. > according to the ticket, seems we need to convert numpy.datetime64 to python datetime manually. This fix is just meant to convert nanosecond timestamps to microseconds so that calling `tolist()` can fit them in a python object. Does it seem ok to you guys to leave it at that scope for now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149193192 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): +curr_type = 'M8[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convertFromPandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) --- End diff -- `toRecords` makes a numpy array of numpy records, and the timestamp dtype is `datetime64`. Calling `toList()` on a record converts everything to a list of python objects. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149192441 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,16 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") +def test_create_dataframe_from_pandas_with_timestamp(self): +import pandas as pd +from datetime import datetime +pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)], +"d": [pd.Timestamp.now().date()]}) +df = self.spark.createDataFrame(pdf) --- End diff -- Yes, looks like that needs to be fixed also. I thought it was working when schema was supplied, but I'll double-check and add that into the tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r149192058 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): --- End diff -- yeah, I agree we should be using lowercase with underscores which is more of the convention for python. I was only using this format to stay consistent with the rest of the file, but I can change it. Just for the new methods right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org