[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148791016
  
--- Diff: python/pyspark/sql/session.py ---
@@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
-if schema is None:
-schema = [str(x) for x in data.columns]
-data = [r.tolist() for r in data.to_records(index=False)]
--- End diff --

according to the ticket, seems we need to convert numpy.datetime64 to 
python datetime manually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148784896
  
--- Diff: python/pyspark/sql/session.py ---
@@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
-if schema is None:
-schema = [str(x) for x in data.columns]
-data = [r.tolist() for r in data.to_records(index=False)]
--- End diff --

(It reminds of me 
[SPARK-6857](https://issues.apache.org/jira/browse/SPARK-6857) BTW)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148782931
  
--- Diff: python/pyspark/sql/session.py ---
@@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
-if schema is None:
-schema = [str(x) for x in data.columns]
-data = [r.tolist() for r in data.to_records(index=False)]
--- End diff --

but ... `numpy.datetime64` is not supported in `createDataFrame` IIUC:

```python
import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]})
print [[v for v in r] for r in pdf.to_records(index=False)]
spark.createDataFrame([[v for v in r] for r in pdf.to_records(index=False)])
```

```
Traceback (most recent call last):
  File "", line 1, in 
  File "/.../spark/python/pyspark/sql/session.py", line 591, in 
createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 404, in 
_createFromLocal
struct = self._inferSchemaFromList(data)
  File "/.../spark/python/pyspark/sql/session.py", line 336, in 
_inferSchemaFromList
schema = reduce(_merge_type, map(_infer_schema, data))
  File "/.../spark/python/pyspark/sql/types.py", line 1095, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/.../spark/python/pyspark/sql/types.py", line 1072, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: 
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148779399
  
--- Diff: python/pyspark/sql/session.py ---
@@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
-if schema is None:
-schema = [str(x) for x in data.columns]
-data = [r.tolist() for r in data.to_records(index=False)]
--- End diff --

seems `r.tolist` is the problem, how about `r[i] for i in xrange(r.size)`? 
Then we can get `numpy.datatype64`
```
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 
1)]}).to_records(index=False)[0].tolist()[0]
15094116610L
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 
1)]}).to_records(index=False)[0][0]
numpy.datetime64('2017-10-31T02:01:01.0+0100')
>>>
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148778328
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() will output 
values as longs,
+# this conversion will lead to an output of py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
+curr_type = 'M8[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convertFromPandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
--- End diff --

thanks! I also tried the data type:
```
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).dtypes
tsdatetime64[ns]
dtype: object
>>> pd.DataFrame({"d": [pd.Timestamp.now().date()]}).dtypes
dobject
dtype: object
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148773536
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() will output 
values as longs,
+# this conversion will lead to an output of py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
+curr_type = 'M8[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convertFromPandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
--- End diff --

I got:

```python
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 
1)]}).to_records(index=False)[0].tolist()[0]
15094116610L
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 
1)]}).to_records(index=False)[0][0]
numpy.datetime64('2017-10-31T01:01:01.0')
```

whereas:

```python

>>> pd.DataFrame({"d": 
[pd.Timestamp.now().date()]}).to_records(index=False)[0].tolist()[0]
datetime.date(2017, 11, 3)
>>> pd.DataFrame({"d": 
[pd.Timestamp.now().date()]}).to_records(index=False)[0][0]
datetime.date(2017, 11, 3)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148771775
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,16 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
+def test_create_dataframe_from_pandas_with_timestamp(self):
+import pandas as pd
+from datetime import datetime
+pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
+"d": [pd.Timestamp.now().date()]})
+df = self.spark.createDataFrame(pdf)
--- End diff --

I was checking this PR and was ran this for my curiosity. I got:

```python
import pandas as pd
from datetime import datetime

pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)], "d": 
[pd.Timestamp.now().date()]})
spark.createDataFrame(pdf, "d date, ts timestamp")
```

```
Traceback (most recent call last):
  File "", line 1, in 
  File "/.../spark/python/pyspark/sql/session.py", line 587, in 
createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 401, in 
_createFromLocal
data = list(data)
  File "/.../spark/python/pyspark/sql/session.py", line 567, in prepare
verify_func(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
verify_value(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1392, in verify_struct
verifier(v)
  File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
verify_value(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1405, in 
verify_default
verify_acceptable_types(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1300, in 
verify_acceptable_types
% (dataType, obj, type(obj
TypeError: field ts: TimestampType can not accept object 
15094116610L in type 
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148770218
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
--- End diff --

I know it's confusing .. but I usually use thisNamingRule mainly on the 
purpose of API consistency and otherwise use this_naming_rule. I actually 
checked and read documentation and other codes few times for clarification for 
myself .. I believe this_naming_rule is preferred by PEP 8.

But I know that the 
[doc](https://www.python.org/dev/peps/pep-0008/#function-names) says:

> mixedCase is allowed only in contexts where that's already the prevailing 
style (e.g. threading.py), to retain backwards compatibility.

but .. I believe we should avoid thisNamingRule if it's in particular for 
internal use and/or unrelated with compatibility. Up to my knowledge, 
`threading.py` is the similar case I believe ... 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148763906
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() will output 
values as longs,
+# this conversion will lead to an output of py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
+curr_type = 'M8[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convertFromPandas(self, pdf, schema):
--- End diff --

ditto for naming


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148763235
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
--- End diff --

nit: `_getNumpyRecordDtypes` -> `_get_numpy_record_dtypes`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148724335
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() will output 
values as longs,
+# this conversion will lead to an output of py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
+curr_type = 'M8[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+return record_type_list if has_rec_fix else None
+
+def _convertFromPandas(self, pdf, schema):
+"""
+ Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
+ :return tuple of list of records and schema
+"""
+# Convert pandas.DataFrame to list of numpy records
+np_records = pdf.to_records(index=False)
--- End diff --

after `toRecords`, what's the type of timestamp value? python datetime?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-02 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148707442
  
--- Diff: python/pyspark/sql/session.py ---
@@ -512,9 +512,39 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
+import numpy as np
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = data.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer 
properly
+record_type_list = None
+if schema is None and len(np_records) > 0:
+cur_dtypes = np_records[0].dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
--- End diff --

Ooops, I forgot about that. thx!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-02 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148707441
  
--- Diff: python/pyspark/sql/session.py ---
@@ -512,9 +512,39 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
+import numpy as np
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = data.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer 
properly
+record_type_list = None
+if schema is None and len(np_records) > 0:
+cur_dtypes = np_records[0].dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to 
microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() 
will output values as longs,
+# this conversion will lead to an output of py 
datetime objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
+curr_type = 'M8[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+if not has_rec_fix:
+record_type_list = None
--- End diff --

Yeah, probably a good idea.  I'll see if I can clean it up some. Thanks 
@viirya !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-02 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148709143
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() will output 
values as longs,
+# this conversion will lead to an output of py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
--- End diff --

There shouldn't be any difference for the most part.  I only used `M8` here 
because when debugging these types, that is what was being output for the 
record types by `numpy.record.dtype`.  Would you prefer `datetime64` if that 
works?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148708752
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() will output 
values as longs,
+# this conversion will lead to an output of py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
--- End diff --

Isn't this `datetime64[ns]`? What's the defference between `M8[ns]` and 
`datetime64[ns]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148709362
  
--- Diff: python/pyspark/sql/session.py ---
@@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _getNumpyRecordDtypes(self, rec):
+"""
+Used when converting a pandas.DataFrame to Spark using 
to_records(), this will correct
+the dtypes of records so they can be properly loaded into Spark.
+:param rec: a numpy record to check dtypes
+:return corrected dtypes for a numpy.record or None if no 
correction needed
+"""
+import numpy as np
+cur_dtypes = rec.dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() will output 
values as longs,
+# this conversion will lead to an output of py datetime 
objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
--- End diff --

Yes, I'd prefer it if that works, otherwise I'd like you to add some 
comments saying we can use `M8[ns]` instead of `datetime64[ns]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148709757
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2592,6 +2592,16 @@ def test_create_dataframe_from_array_of_long(self):
 df = self.spark.createDataFrame(data)
 self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 
0, 9223372036854775807]))
 
+@unittest.skipIf(not _have_pandas, "Pandas not installed")
+def test_create_dataframe_from_pandas_with_timestamp(self):
+import pandas as pd
+from datetime import datetime
+pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
+"d": [pd.Timestamp.now().date()]})
+df = self.spark.createDataFrame(pdf)
--- End diff --

What if we specify the schema? For example:

```
df = self.spark.createDataFrame(pdf, "ts timestamp, d date")
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148696316
  
--- Diff: python/pyspark/sql/session.py ---
@@ -512,9 +512,39 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
+import numpy as np
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = data.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer 
properly
+record_type_list = None
+if schema is None and len(np_records) > 0:
+cur_dtypes = np_records[0].dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
--- End diff --

oh, session.py didn't define xrange for version > 3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-02 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19646#discussion_r148695963
  
--- Diff: python/pyspark/sql/session.py ---
@@ -512,9 +512,39 @@ def createDataFrame(self, data, schema=None, 
samplingRatio=None, verifySchema=Tr
 except Exception:
 has_pandas = False
 if has_pandas and isinstance(data, pandas.DataFrame):
+import numpy as np
+
+# Convert pandas.DataFrame to list of numpy records
+np_records = data.to_records(index=False)
+
+# Check if any columns need to be fixed for Spark to infer 
properly
+record_type_list = None
+if schema is None and len(np_records) > 0:
+cur_dtypes = np_records[0].dtype
+col_names = cur_dtypes.names
+record_type_list = []
+has_rec_fix = False
+for i in xrange(len(cur_dtypes)):
+curr_type = cur_dtypes[i]
+# If type is a datetime64 timestamp, convert to 
microseconds
+# NOTE: if dtype is M8[ns] then np.record.tolist() 
will output values as longs,
+# this conversion will lead to an output of py 
datetime objects, see SPARK-22417
+if curr_type == np.dtype('M8[ns]'):
+curr_type = 'M8[us]'
+has_rec_fix = True
+record_type_list.append((str(col_names[i]), curr_type))
+if not has_rec_fix:
+record_type_list = None
--- End diff --

Shall we put this into an internal method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19646: [SPARK-22147][PYTHON] Fix for createDataFrame fro...

2017-11-02 Thread BryanCutler
GitHub user BryanCutler opened a pull request:

https://github.com/apache/spark/pull/19646

[SPARK-22147][PYTHON] Fix for createDataFrame from pandas.DataFrame with 
timestamp

## What changes were proposed in this pull request?

Currently, a pandas.DataFrame that contains a timestamp of type 
'datetime64[ns]' when converted to a Spark DataFrame with `createDataFrame` 
will interpret the values as LongType. This fix will check for a timestamp type 
and convert it to microseconds which will allow Spark to read as TimestampType.

## How was this patch tested?

Added unit test to verify Spark schema is expected for TimestampType and 
DateType when created from pandas

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/BryanCutler/spark 
pyspark-non-arrow-createDataFrame-ts-fix-SPARK-22417

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19646.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19646


commit cca6757b36fbe8a73a81570625f5efa6e24bd8c6
Author: Bryan Cutler 
Date:   2017-11-02T23:03:00Z

added fix for pandas timestamp to convert to microseconds for 
createDataFrame




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org