[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149867086
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
--- End diff --

Ah, in that case, maybe we need to revert one of the two original patches 
and fix one by one, or merge the two follow-ups into one as a hot-fix pr. cc 
@gatorsmile @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-08 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149866007
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
--- End diff --

BTW, @ueshin .
`branch-2.2` Jenkins will fail due to #19701 .
Could you merge #19701 first?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-08 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149865875
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
--- End diff --

Great, @ueshin ! :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-08 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149865798
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
--- End diff --

Thank you, @BryanCutler !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-08 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149865739
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
--- End diff --

I can take it over. I'll submit a pr soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-08 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149863519
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
--- End diff --

I can add a patch a little bit later tonight unless someone is able to get 
to it first.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-08 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149863279
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
--- End diff --

Thanks @dongjoon-hyun for tracking this down.  It looks like sql/tests.py 
for branch-2.2 is just missing the following

```

_have_pandas = False
try:
import pandas
_have_pandas = True
except:
# No Pandas, but that's okay, we'll skip those tests
pass
```
This was probably adding from an earlier PR in master and wasn't included 
when this was cherry-picked.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-08 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149860091
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
--- End diff --

Hi, @cloud-fan and @BryanCutler .
This seems to break `branch-2.2`, but has been hidden behind another SQL 
error. (cc @gatorsmile , @henryr)
Please see 
[this](https://github.com/apache/spark/pull/19701#issuecomment-343037369).

cc @felixcheung since he is RM for 2.2.1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19646


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-07 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149491411
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _get_numpy_record_dtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is datetime[ns] then np.record.tolist() will 
output values as longs,
+# conversion from [us] or lower will lead to py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('datetime64[ns]'):
+curr_type = 'datetime64[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convert_from_pandas(self, pdf, schema):
--- End diff --

yes, it isn't used in the conversion any more, but I think it should still 
be here for the case when it's None and then assigned to a list of the pdf 
column names.  That way we can keep all pandas related code in this method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-07 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149490465
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _get_numpy_record_dtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is datetime[ns] then np.record.tolist() will 
output values as longs,
+# conversion from [us] or lower will lead to py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('datetime64[ns]'):
+curr_type = 'datetime64[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convert_from_pandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# If no schema supplied by user then get the names of columns only
+if schema is None:
+schema = [str(x) for x in pdf.columns]
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer properly
+if len(np_records) > 0:
+record_type_list = self._get_numpy_record_dtypes(np_records[0])
+if record_type_list is not None:
+return [r.astype(record_type_list).tolist() for r in 
np_records], schema
--- End diff --

I don't think this will increase performance, we will still have to iterate 
over each record and convert to a list in addition to making a copy of the 
timestamp data.  Another issue is that using `DataFrame.astype` will truncate 
the resolution to microseconds, but Pandas will continue to store as 
`datetime64[ns]`.  see https://stackoverflow.com/a/32827472

This means that we have to change the conversion routine to separate all 
columns in the DataFrame and manually convert to rows of records instead of 
using `to_records()` and `tolist()`.  

I think it would be best to keep the casting on the numpy side, it's safer 
and keeps things simpler.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149283762
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _get_numpy_record_dtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is datetime[ns] then np.record.tolist() will 
output values as longs,
+# conversion from [us] or lower will lead to py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('datetime64[ns]'):
+curr_type = 'datetime64[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convert_from_pandas(self, pdf, schema):
--- End diff --

I guess we can remove `schema` parameter from here because the `schema` 
doesn't affect the conversion now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149212192
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _get_numpy_record_dtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is datetime[ns] then np.record.tolist() will 
output values as longs,
+# conversion from [us] or lower will lead to py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('datetime64[ns]'):
+curr_type = 'datetime64[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convert_from_pandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# If no schema supplied by user then get the names of columns only
+if schema is None:
+schema = [str(x) for x in pdf.columns]
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer properly
+if len(np_records) > 0:
+record_type_list = self._get_numpy_record_dtypes(np_records[0])
+if record_type_list is not None:
+return [r.astype(record_type_list).tolist() for r in 
np_records], schema
--- End diff --

ok let's copy it. Is it a valid idea to use `DataFrame.astype`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149211050
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _get_numpy_record_dtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is datetime[ns] then np.record.tolist() will 
output values as longs,
+# conversion from [us] or lower will lead to py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('datetime64[ns]'):
+curr_type = 'datetime64[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convert_from_pandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# If no schema supplied by user then get the names of columns only
+if schema is None:
+schema = [str(x) for x in pdf.columns]
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer properly
+if len(np_records) > 0:
+record_type_list = self._get_numpy_record_dtypes(np_records[0])
+if record_type_list is not None:
+return [r.astype(record_type_list).tolist() for r in 
np_records], schema
--- End diff --

Then that would modify the input pandas.DataFrame from the user, which 
would be bad if they use it after this call.  Making a copy of the DataFrame 
might not be good either if it is large.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149210042
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _get_numpy_record_dtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is datetime[ns] then np.record.tolist() will 
output values as longs,
+# conversion from [us] or lower will lead to py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('datetime64[ns]'):
+curr_type = 'datetime64[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convert_from_pandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# If no schema supplied by user then get the names of columns only
+if schema is None:
+schema = [str(x) for x in pdf.columns]
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer properly
+if len(np_records) > 0:
+record_type_list = self._get_numpy_record_dtypes(np_records[0])
--- End diff --

The dtype for a numpy record is in a different format
```
n [16]: r
Out[16]: (0, datetime.date(2017, 11, 6), 15094116610L)

In [17]: r.dtype
Out[17]: dtype((numpy.record, [(u'index', '

[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149207295
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _get_numpy_record_dtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is datetime[ns] then np.record.tolist() will 
output values as longs,
+# conversion from [us] or lower will lead to py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('datetime64[ns]'):
+curr_type = 'datetime64[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convert_from_pandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# If no schema supplied by user then get the names of columns only
+if schema is None:
+schema = [str(x) for x in pdf.columns]
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer properly
+if len(np_records) > 0:
+record_type_list = self._get_numpy_record_dtypes(np_records[0])
+if record_type_list is not None:
+return [r.astype(record_type_list).tolist() for r in 
np_records], schema
--- End diff --

instead of doing this, we should call `DataFrame.astype`, which accepts a 
python dict. Then we can create a dict that maps column name to corrected 
dtype(only include columns need to cast). We can also specify (copy=False) for 
better performance.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149206178
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _get_numpy_record_dtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is datetime[ns] then np.record.tolist() will 
output values as longs,
+# conversion from [us] or lower will lead to py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('datetime64[ns]'):
+curr_type = 'datetime64[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convert_from_pandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# If no schema supplied by user then get the names of columns only
+if schema is None:
+schema = [str(x) for x in pdf.columns]
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer properly
+if len(np_records) > 0:
+record_type_list = self._get_numpy_record_dtypes(np_records[0])
--- End diff --

can't we just use `pdf.dtype`?
```
>>> pdf.dtypes[0]
dtype('int64')
>>> pdf.to_records(index=False)[0].dtype[0]
dtype('int64')
```

I think they are same


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149196424
  
--- Diff: python/pyspark/sql/session.py ---
@@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
-if schema is None:
-schema = [str(x) for x in data.columns]
-data = [r.tolist() for r in data.to_records(index=False)]
--- End diff --

the problem is that nanosecond values can not be converted to a python 
datetime object, which only has microsecond resolution, so numpy converts it to 
long.  Numpy will convert microseconds and above to python datetime objects, 
which Spark will correctly infer.

> according to the ticket, seems we need to convert numpy.datetime64 to 
python datetime manually.

This fix is just meant to convert nanosecond timestamps to microseconds so 
that calling `tolist()` can fit them in a python object.  Does it seem ok to 
you guys to leave it at that scope for now?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149193192
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() will output 
values as longs,
+# this conversion will lead to an output of py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
+curr_type = 'M8[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convertFromPandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
--- End diff --

`toRecords` makes a numpy array of numpy records, and the timestamp dtype 
is `datetime64`.  Calling `toList()` on a record converts everything to a list 
of python objects.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149192441
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,16 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
+def test_create_dataframe_from_pandas_with_timestamp(self):
+import pandas as pd
+from datetime import datetime
+pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
+"d": [pd.Timestamp.now().date()]})
+df = self.spark.createDataFrame(pdf)
--- End diff --

Yes, looks like that needs to be fixed also.  I thought it was working when 
schema was supplied, but I'll double-check and add that into the tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22417][PYTHON] Fix for createDataFrame fro...

2017-11-06 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r149192058
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
--- End diff --

yeah, I agree we should be using lowercase with underscores which is more 
of the convention for python.  I was only using this format to stay consistent 
with the rest of the file, but I can change it.  Just for the new methods right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org