[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148791016 --- Diff: python/pyspark/sql/session.py --- @@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): -if schema is None: -schema = [str(x) for x in data.columns] -data = [r.tolist() for r in data.to_records(index=False)] --- End diff -- according to the ticket, seems we need to convert numpy.datetime64 to python datetime manually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148784896 --- Diff: python/pyspark/sql/session.py --- @@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): -if schema is None: -schema = [str(x) for x in data.columns] -data = [r.tolist() for r in data.to_records(index=False)] --- End diff -- (It reminds of me [SPARK-6857](https://issues.apache.org/jira/browse/SPARK-6857) BTW) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148782931 --- Diff: python/pyspark/sql/session.py --- @@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): -if schema is None: -schema = [str(x) for x in data.columns] -data = [r.tolist() for r in data.to_records(index=False)] --- End diff -- but ... `numpy.datetime64` is not supported in `createDataFrame` IIUC: ```python import pandas as pd from datetime import datetime pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}) print [[v for v in r] for r in pdf.to_records(index=False)] spark.createDataFrame([[v for v in r] for r in pdf.to_records(index=False)]) ``` ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/session.py", line 591, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, data), schema) File "/.../spark/python/pyspark/sql/session.py", line 404, in _createFromLocal struct = self._inferSchemaFromList(data) File "/.../spark/python/pyspark/sql/session.py", line 336, in _inferSchemaFromList schema = reduce(_merge_type, map(_infer_schema, data)) File "/.../spark/python/pyspark/sql/types.py", line 1095, in _infer_schema fields = [StructField(k, _infer_type(v), True) for k, v in items] File "/.../spark/python/pyspark/sql/types.py", line 1072, in _infer_type raise TypeError("not supported type: %s" % type(obj)) TypeError: not supported type: ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148779399 --- Diff: python/pyspark/sql/session.py --- @@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): -if schema is None: -schema = [str(x) for x in data.columns] -data = [r.tolist() for r in data.to_records(index=False)] --- End diff -- seems `r.tolist` is the problem, how about `r[i] for i in xrange(r.size)`? Then we can get `numpy.datatype64` ``` >>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0].tolist()[0] 15094116610L >>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0][0] numpy.datetime64('2017-10-31T02:01:01.0+0100') >>> ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148778328 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): +curr_type = 'M8[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convertFromPandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) --- End diff -- thanks! I also tried the data type: ``` >>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).dtypes tsdatetime64[ns] dtype: object >>> pd.DataFrame({"d": [pd.Timestamp.now().date()]}).dtypes dobject dtype: object ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148773536 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): +curr_type = 'M8[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convertFromPandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) --- End diff -- I got: ```python >>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0].tolist()[0] 15094116610L >>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0][0] numpy.datetime64('2017-10-31T01:01:01.0') ``` whereas: ```python >>> pd.DataFrame({"d": [pd.Timestamp.now().date()]}).to_records(index=False)[0].tolist()[0] datetime.date(2017, 11, 3) >>> pd.DataFrame({"d": [pd.Timestamp.now().date()]}).to_records(index=False)[0][0] datetime.date(2017, 11, 3) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148771775 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,16 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") +def test_create_dataframe_from_pandas_with_timestamp(self): +import pandas as pd +from datetime import datetime +pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)], +"d": [pd.Timestamp.now().date()]}) +df = self.spark.createDataFrame(pdf) --- End diff -- I was checking this PR and was ran this for my curiosity. I got: ```python import pandas as pd from datetime import datetime pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)], "d": [pd.Timestamp.now().date()]}) spark.createDataFrame(pdf, "d date, ts timestamp") ``` ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/session.py", line 587, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, data), schema) File "/.../spark/python/pyspark/sql/session.py", line 401, in _createFromLocal data = list(data) File "/.../spark/python/pyspark/sql/session.py", line 567, in prepare verify_func(obj) File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify verify_value(obj) File "/.../spark/python/pyspark/sql/types.py", line 1392, in verify_struct verifier(v) File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify verify_value(obj) File "/.../spark/python/pyspark/sql/types.py", line 1405, in verify_default verify_acceptable_types(obj) File "/.../spark/python/pyspark/sql/types.py", line 1300, in verify_acceptable_types % (dataType, obj, type(obj TypeError: field ts: TimestampType can not accept object 15094116610L in type ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148770218 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): --- End diff -- I know it's confusing .. but I usually use thisNamingRule mainly on the purpose of API consistency and otherwise use this_naming_rule. I actually checked and read documentation and other codes few times for clarification for myself .. I believe this_naming_rule is preferred by PEP 8. But I know that the [doc](https://www.python.org/dev/peps/pep-0008/#function-names) says: > mixedCase is allowed only in contexts where that's already the prevailing style (e.g. threading.py), to retain backwards compatibility. but .. I believe we should avoid thisNamingRule if it's in particular for internal use and/or unrelated with compatibility. Up to my knowledge, `threading.py` is the similar case I believe ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148763906 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): +curr_type = 'M8[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convertFromPandas(self, pdf, schema): --- End diff -- ditto for naming --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148763235 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): --- End diff -- nit: `_getNumpyRecordDtypes` -> `_get_numpy_record_dtypes`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148724335 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): +curr_type = 'M8[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +return record_type_list if has_rec_fix else None + +def _convertFromPandas(self, pdf, schema): +""" + Convert a pandas.DataFrame to list of records that can be used to make a DataFrame + :return tuple of list of records and schema +""" +# Convert pandas.DataFrame to list of numpy records +np_records = pdf.to_records(index=False) --- End diff -- after `toRecords`, what's the type of timestamp value? python datetime? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148707442 --- Diff: python/pyspark/sql/session.py --- @@ -512,9 +512,39 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): +import numpy as np + +# Convert pandas.DataFrame to list of numpy records +np_records = data.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +record_type_list = None +if schema is None and len(np_records) > 0: +cur_dtypes = np_records[0].dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): --- End diff -- Ooops, I forgot about that. thx! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148707441 --- Diff: python/pyspark/sql/session.py --- @@ -512,9 +512,39 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): +import numpy as np + +# Convert pandas.DataFrame to list of numpy records +np_records = data.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +record_type_list = None +if schema is None and len(np_records) > 0: +cur_dtypes = np_records[0].dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): +curr_type = 'M8[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +if not has_rec_fix: +record_type_list = None --- End diff -- Yeah, probably a good idea. I'll see if I can clean it up some. Thanks @viirya ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148709143 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): --- End diff -- There shouldn't be any difference for the most part. I only used `M8` here because when debugging these types, that is what was being output for the record types by `numpy.record.dtype`. Would you prefer `datetime64` if that works? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148708752 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): --- End diff -- Isn't this `datetime64[ns]`? What's the defference between `M8[ns]` and `datetime64[ns]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148709362 --- Diff: python/pyspark/sql/session.py --- @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _getNumpyRecordDtypes(self, rec): +""" +Used when converting a pandas.DataFrame to Spark using to_records(), this will correct +the dtypes of records so they can be properly loaded into Spark. +:param rec: a numpy record to check dtypes +:return corrected dtypes for a numpy.record or None if no correction needed +""" +import numpy as np +cur_dtypes = rec.dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): --- End diff -- Yes, I'd prefer it if that works, otherwise I'd like you to add some comments saying we can use `M8[ns]` instead of `datetime64[ns]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148709757 --- Diff: python/pyspark/sql/tests.py --- @@ -2592,6 +2592,16 @@ def test_create_dataframe_from_array_of_long(self): df = self.spark.createDataFrame(data) self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) +@unittest.skipIf(not _have_pandas, "Pandas not installed") +def test_create_dataframe_from_pandas_with_timestamp(self): +import pandas as pd +from datetime import datetime +pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)], +"d": [pd.Timestamp.now().date()]}) +df = self.spark.createDataFrame(pdf) --- End diff -- What if we specify the schema? For example: ``` df = self.spark.createDataFrame(pdf, "ts timestamp, d date") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148696316 --- Diff: python/pyspark/sql/session.py --- @@ -512,9 +512,39 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): +import numpy as np + +# Convert pandas.DataFrame to list of numpy records +np_records = data.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +record_type_list = None +if schema is None and len(np_records) > 0: +cur_dtypes = np_records[0].dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): --- End diff -- oh, session.py didn't define xrange for version > 3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19646#discussion_r148695963 --- Diff: python/pyspark/sql/session.py --- @@ -512,9 +512,39 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): +import numpy as np + +# Convert pandas.DataFrame to list of numpy records +np_records = data.to_records(index=False) + +# Check if any columns need to be fixed for Spark to infer properly +record_type_list = None +if schema is None and len(np_records) > 0: +cur_dtypes = np_records[0].dtype +col_names = cur_dtypes.names +record_type_list = [] +has_rec_fix = False +for i in xrange(len(cur_dtypes)): +curr_type = cur_dtypes[i] +# If type is a datetime64 timestamp, convert to microseconds +# NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, +# this conversion will lead to an output of py datetime objects, see SPARK-22417 +if curr_type == np.dtype('M8[ns]'): +curr_type = 'M8[us]' +has_rec_fix = True +record_type_list.append((str(col_names[i]), curr_type)) +if not has_rec_fix: +record_type_list = None --- End diff -- Shall we put this into an internal method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...
GitHub user BryanCutler opened a pull request: https://github.com/apache/spark/pull/19646 [SPARK-22147][PYTHON] Fix for createDataFrame from pandas.DataFrame with timestamp ## What changes were proposed in this pull request? Currently, a pandas.DataFrame that contains a timestamp of type 'datetime64[ns]' when converted to a Spark DataFrame with `createDataFrame` will interpret the values as LongType. This fix will check for a timestamp type and convert it to microseconds which will allow Spark to read as TimestampType. ## How was this patch tested? Added unit test to verify Spark schema is expected for TimestampType and DateType when created from pandas You can merge this pull request into a Git repository by running: $ git pull https://github.com/BryanCutler/spark pyspark-non-arrow-createDataFrame-ts-fix-SPARK-22417 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19646.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19646 commit cca6757b36fbe8a73a81570625f5efa6e24bd8c6 Author: Bryan CutlerDate: 2017-11-02T23:03:00Z added fix for pandas timestamp to convert to microseconds for createDataFrame --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org