[jira] [Commented] (SPARK-27939) Defining a schema with VectorUDT

2019-06-05 Thread Johannes Schaffrath (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16856975#comment-16856975
 ] 

Johannes Schaffrath commented on SPARK-27939:
-

Hi Bryan,

thank you very much for the detailed information. I just saw that this is also 
mentioned in the documentation [1], but like you said it is not intuitive.

[1] 
http://spark.apache.org/docs/2.2.1/api/python/pyspark.sql.html#pyspark.sql.Row

> Defining a schema with VectorUDT
> 
>
> Key: SPARK-27939
> URL: https://issues.apache.org/jira/browse/SPARK-27939
> Project: Spark
>  Issue Type: Bug
>  Components: ML, PySpark
>Affects Versions: 2.4.0
>Reporter: Johannes Schaffrath
>Priority: Minor
>
> When I try to define a dataframe schema which has a VectorUDT field, I run 
> into an error when the VectorUDT field is not the last element of the 
> StructType list.
> The following example causes the error below:
> {code:java}
> // from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark.sql import Row
> from pyspark.ml.linalg import VectorUDT, SparseVector
> #VectorUDT should be the last structfield
> train_schema = T.StructType([
>     T.StructField('features', VectorUDT()),
>     T.StructField('SALESCLOSEPRICE', T.IntegerType())
>     ])
>   
> train_df = spark.createDataFrame(
> [Row(features=SparseVector(135, {0: 139900.0, 1: 139900.0, 2: 980.0, 3: 10.0, 
> 5: 980.0, 6: 1858.0, 7: 1858.0, 8: 980.0, 9: 1950.0, 10: 1.28, 11: 1.0, 12: 
> 1.0, 15: 2.0, 16: 3.0, 20: 2017.0, 21: 7.0, 22: 28.0, 23: 15.0, 24: 196.0, 
> 25: 25.0, 26: -1.0, 27: 4.03, 28: 3.96, 29: 3.88, 30: 3.9, 31: 3.91, 32: 9.8, 
> 33: 22.4, 34: 67.8, 35: 49.8, 36: 11.9, 37: 2.7, 38: 0.2926, 39: 142.7551, 
> 40: 980.0, 41: 0.0133, 42: 1.5, 43: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: 
> -1.0, 55: -1.0, 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 81: 1.0, 89: 
> 1.0, 95: 1.0, 96: 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 123: 
> 1.0, 133: 1.0}), SALESCLOSEPRICE=143000),
>  Row(features=SparseVector(135, {0: 21.0, 1: 21.0, 2: 1144.0, 3: 4.0, 
> 5: 1268.0, 6: 1640.0, 7: 1640.0, 8: 2228.0, 9: 1971.0, 10: 0.32, 11: 1.0, 14: 
> 2.0, 15: 3.0, 16: 4.0, 17: 960.0, 20: 2017.0, 21: 10.0, 22: 41.0, 23: 9.0, 
> 24: 282.0, 25: 2.0, 26: -1.0, 27: 3.91, 28: 3.85, 29: 3.83, 30: 3.83, 31: 
> 3.78, 32: 32.2, 33: 49.0, 34: 18.8, 35: 14.0, 36: 35.8, 37: 14.6, 38: 0.4392, 
> 39: 94.2549, 40: 2228.0, 41: 0.0078, 42: 1., 43: -1.0, 44: -1.0, 45: 
> -1.0, 46: -1.0, 47: -1.0, 48: -1.0, 49: -1.0, 50: -1.0, 52: 1.0, 55: -1.0, 
> 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 
> 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 124: 1.0, 133: 1.0}), 
> SALESCLOSEPRICE=19),
>  Row(features=SparseVector(135, {0: 225000.0, 1: 225000.0, 2: 1102.0, 3: 
> 28.0, 5: 1102.0, 6: 2390.0, 7: 2390.0, 8: 1102.0, 9: 1949.0, 10: 0.822, 11: 
> 1.0, 15: 1.0, 16: 2.0, 20: 2017.0, 21: 6.0, 22: 26.0, 23: 26.0, 24: 177.0, 
> 25: 25.0, 26: -1.0, 27: 3.88, 28: 3.9, 29: 3.91, 30: 3.89, 31: 3.94, 32: 9.8, 
> 33: 22.4, 34: 67.8, 35: 61.7, 36: 2.7, 38: 0.4706, 39: 204.1742, 40: 1102.0, 
> 41: 0.0106, 42: 2.0, 49: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: -1.0, 57: 
> 1.0, 62: 1.0, 68: 1.0, 70: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 1.0, 100: 1.0, 
> 103: 1.0, 108: 1.0, 110: 1.0, 115: 1.0, 123: 1.0, 131: 1.0, 132: 1.0}), 
> SALESCLOSEPRICE=225000)
>  ], schema=train_schema)
>  
> train_df.printSchema()
> train_df.show()
> {code}
> Error  message:
> {code:java}
> // Fail to execute line 17: ], schema=train_schema) Traceback (most recent 
> call last): File "/tmp/zeppelin_pyspark-3793375738105660281.py", line 375, in 
>  exec(code, _zcUserQueryNameSpace) File "", line 17, in 
>  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", 
> line 748, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, 
> data), schema) File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 429, in 
> _createFromLocal data = [schema.toInternal(row) for row in data] File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 429, in 
>  data = [schema.toInternal(row) for row in data] File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 604, in 
> toInternal for f, v, c in zip(self.fields, obj, self._needConversion)) File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 604, in 
>  for f, v, c in zip(self.fields, obj, self._needConversion)) File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 442, in 
> toInternal return self.dataType.toInternal(obj) File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 685, in 
> toInternal return 

[jira] [Commented] (SPARK-27939) Defining a schema with VectorUDT

2019-06-04 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16855969#comment-16855969
 ] 

Bryan Cutler commented on SPARK-27939:
--

Another problem with Python {{Row}} class

> Defining a schema with VectorUDT
> 
>
> Key: SPARK-27939
> URL: https://issues.apache.org/jira/browse/SPARK-27939
> Project: Spark
>  Issue Type: Bug
>  Components: ML, PySpark
>Affects Versions: 2.4.0
>Reporter: Johannes Schaffrath
>Priority: Minor
>
> When I try to define a dataframe schema which has a VectorUDT field, I run 
> into an error when the VectorUDT field is not the last element of the 
> StructType list.
> The following example causes the error below:
> {code:java}
> // from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark.sql import Row
> from pyspark.ml.linalg import VectorUDT, SparseVector
> #VectorUDT should be the last structfield
> train_schema = T.StructType([
>     T.StructField('features', VectorUDT()),
>     T.StructField('SALESCLOSEPRICE', T.IntegerType())
>     ])
>   
> train_df = spark.createDataFrame(
> [Row(features=SparseVector(135, {0: 139900.0, 1: 139900.0, 2: 980.0, 3: 10.0, 
> 5: 980.0, 6: 1858.0, 7: 1858.0, 8: 980.0, 9: 1950.0, 10: 1.28, 11: 1.0, 12: 
> 1.0, 15: 2.0, 16: 3.0, 20: 2017.0, 21: 7.0, 22: 28.0, 23: 15.0, 24: 196.0, 
> 25: 25.0, 26: -1.0, 27: 4.03, 28: 3.96, 29: 3.88, 30: 3.9, 31: 3.91, 32: 9.8, 
> 33: 22.4, 34: 67.8, 35: 49.8, 36: 11.9, 37: 2.7, 38: 0.2926, 39: 142.7551, 
> 40: 980.0, 41: 0.0133, 42: 1.5, 43: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: 
> -1.0, 55: -1.0, 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 81: 1.0, 89: 
> 1.0, 95: 1.0, 96: 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 123: 
> 1.0, 133: 1.0}), SALESCLOSEPRICE=143000),
>  Row(features=SparseVector(135, {0: 21.0, 1: 21.0, 2: 1144.0, 3: 4.0, 
> 5: 1268.0, 6: 1640.0, 7: 1640.0, 8: 2228.0, 9: 1971.0, 10: 0.32, 11: 1.0, 14: 
> 2.0, 15: 3.0, 16: 4.0, 17: 960.0, 20: 2017.0, 21: 10.0, 22: 41.0, 23: 9.0, 
> 24: 282.0, 25: 2.0, 26: -1.0, 27: 3.91, 28: 3.85, 29: 3.83, 30: 3.83, 31: 
> 3.78, 32: 32.2, 33: 49.0, 34: 18.8, 35: 14.0, 36: 35.8, 37: 14.6, 38: 0.4392, 
> 39: 94.2549, 40: 2228.0, 41: 0.0078, 42: 1., 43: -1.0, 44: -1.0, 45: 
> -1.0, 46: -1.0, 47: -1.0, 48: -1.0, 49: -1.0, 50: -1.0, 52: 1.0, 55: -1.0, 
> 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 
> 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 124: 1.0, 133: 1.0}), 
> SALESCLOSEPRICE=19),
>  Row(features=SparseVector(135, {0: 225000.0, 1: 225000.0, 2: 1102.0, 3: 
> 28.0, 5: 1102.0, 6: 2390.0, 7: 2390.0, 8: 1102.0, 9: 1949.0, 10: 0.822, 11: 
> 1.0, 15: 1.0, 16: 2.0, 20: 2017.0, 21: 6.0, 22: 26.0, 23: 26.0, 24: 177.0, 
> 25: 25.0, 26: -1.0, 27: 3.88, 28: 3.9, 29: 3.91, 30: 3.89, 31: 3.94, 32: 9.8, 
> 33: 22.4, 34: 67.8, 35: 61.7, 36: 2.7, 38: 0.4706, 39: 204.1742, 40: 1102.0, 
> 41: 0.0106, 42: 2.0, 49: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: -1.0, 57: 
> 1.0, 62: 1.0, 68: 1.0, 70: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 1.0, 100: 1.0, 
> 103: 1.0, 108: 1.0, 110: 1.0, 115: 1.0, 123: 1.0, 131: 1.0, 132: 1.0}), 
> SALESCLOSEPRICE=225000)
>  ], schema=train_schema)
>  
> train_df.printSchema()
> train_df.show()
> {code}
> Error  message:
> {code:java}
> // Fail to execute line 17: ], schema=train_schema) Traceback (most recent 
> call last): File "/tmp/zeppelin_pyspark-3793375738105660281.py", line 375, in 
>  exec(code, _zcUserQueryNameSpace) File "", line 17, in 
>  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", 
> line 748, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, 
> data), schema) File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 429, in 
> _createFromLocal data = [schema.toInternal(row) for row in data] File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 429, in 
>  data = [schema.toInternal(row) for row in data] File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 604, in 
> toInternal for f, v, c in zip(self.fields, obj, self._needConversion)) File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 604, in 
>  for f, v, c in zip(self.fields, obj, self._needConversion)) File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 442, in 
> toInternal return self.dataType.toInternal(obj) File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 685, in 
> toInternal return self._cachedSqlType().toInternal(self.serialize(obj)) File 
> "/opt/spark/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 167, 
> in serialize raise TypeError("cannot serialize %r of type %r" % (obj, 
> type(obj))) TypeError: 

[jira] [Commented] (SPARK-27939) Defining a schema with VectorUDT

2019-06-04 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16855966#comment-16855966
 ] 

Bryan Cutler commented on SPARK-27939:
--

The problem is the {{Row}} class sorts the field names alphabetically, which 
puts capital letters first and then conflicts with your schema:
{noformat}
r = Row(features=SparseVector(135, {0: 139900.0, 1: 139900.0, ...}), 
SALESCLOSEPRICE=143000)

In [3]: r.__fields__
Out[3]: ['SALESCLOSEPRICE', 'features']{noformat}
This is by design, but it is not intuitive and has caused lots of problems.

You can either just specify your data as tuples. for example
{noformat}
In [5]: train_df = spark.createDataFrame([(SparseVector(135, {0: 139900.0}), 
143000)], schema=train_schema)

In [6]: train_df.show()
++---+
| features|SALESCLOSEPRICE|
++---+
|(135,[0],[139900.0])| 143000|
++---+
{noformat}
Or if you want to have keywords, then define your own row class like this:
{noformat}
In [7]: MyRow = Row('features', 'SALESCLOSEPRICE')

In [8]: MyRow(SparseVector(135, {0: 139900.0}), 143000)
Out[8]: Row(features=SparseVector(135, {0: 139900.0}), 
SALESCLOSEPRICE=143000){noformat}

> Defining a schema with VectorUDT
> 
>
> Key: SPARK-27939
> URL: https://issues.apache.org/jira/browse/SPARK-27939
> Project: Spark
>  Issue Type: Bug
>  Components: ML, PySpark
>Affects Versions: 2.4.0
>Reporter: Johannes Schaffrath
>Priority: Minor
>
> When I try to define a dataframe schema which has a VectorUDT field, I run 
> into an error when the VectorUDT field is not the last element of the 
> StructType list.
> The following example causes the error below:
> {code:java}
> // from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark.sql import Row
> from pyspark.ml.linalg import VectorUDT, SparseVector
> #VectorUDT should be the last structfield
> train_schema = T.StructType([
>     T.StructField('features', VectorUDT()),
>     T.StructField('SALESCLOSEPRICE', T.IntegerType())
>     ])
>   
> train_df = spark.createDataFrame(
> [Row(features=SparseVector(135, {0: 139900.0, 1: 139900.0, 2: 980.0, 3: 10.0, 
> 5: 980.0, 6: 1858.0, 7: 1858.0, 8: 980.0, 9: 1950.0, 10: 1.28, 11: 1.0, 12: 
> 1.0, 15: 2.0, 16: 3.0, 20: 2017.0, 21: 7.0, 22: 28.0, 23: 15.0, 24: 196.0, 
> 25: 25.0, 26: -1.0, 27: 4.03, 28: 3.96, 29: 3.88, 30: 3.9, 31: 3.91, 32: 9.8, 
> 33: 22.4, 34: 67.8, 35: 49.8, 36: 11.9, 37: 2.7, 38: 0.2926, 39: 142.7551, 
> 40: 980.0, 41: 0.0133, 42: 1.5, 43: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: 
> -1.0, 55: -1.0, 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 81: 1.0, 89: 
> 1.0, 95: 1.0, 96: 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 123: 
> 1.0, 133: 1.0}), SALESCLOSEPRICE=143000),
>  Row(features=SparseVector(135, {0: 21.0, 1: 21.0, 2: 1144.0, 3: 4.0, 
> 5: 1268.0, 6: 1640.0, 7: 1640.0, 8: 2228.0, 9: 1971.0, 10: 0.32, 11: 1.0, 14: 
> 2.0, 15: 3.0, 16: 4.0, 17: 960.0, 20: 2017.0, 21: 10.0, 22: 41.0, 23: 9.0, 
> 24: 282.0, 25: 2.0, 26: -1.0, 27: 3.91, 28: 3.85, 29: 3.83, 30: 3.83, 31: 
> 3.78, 32: 32.2, 33: 49.0, 34: 18.8, 35: 14.0, 36: 35.8, 37: 14.6, 38: 0.4392, 
> 39: 94.2549, 40: 2228.0, 41: 0.0078, 42: 1., 43: -1.0, 44: -1.0, 45: 
> -1.0, 46: -1.0, 47: -1.0, 48: -1.0, 49: -1.0, 50: -1.0, 52: 1.0, 55: -1.0, 
> 56: -1.0, 57: -1.0, 62: 1.0, 68: 1.0, 77: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 
> 1.0, 101: 1.0, 103: 1.0, 108: 1.0, 114: 1.0, 115: 1.0, 124: 1.0, 133: 1.0}), 
> SALESCLOSEPRICE=19),
>  Row(features=SparseVector(135, {0: 225000.0, 1: 225000.0, 2: 1102.0, 3: 
> 28.0, 5: 1102.0, 6: 2390.0, 7: 2390.0, 8: 1102.0, 9: 1949.0, 10: 0.822, 11: 
> 1.0, 15: 1.0, 16: 2.0, 20: 2017.0, 21: 6.0, 22: 26.0, 23: 26.0, 24: 177.0, 
> 25: 25.0, 26: -1.0, 27: 3.88, 28: 3.9, 29: 3.91, 30: 3.89, 31: 3.94, 32: 9.8, 
> 33: 22.4, 34: 67.8, 35: 61.7, 36: 2.7, 38: 0.4706, 39: 204.1742, 40: 1102.0, 
> 41: 0.0106, 42: 2.0, 49: 1.0, 51: -1.0, 52: -1.0, 53: -1.0, 54: -1.0, 57: 
> 1.0, 62: 1.0, 68: 1.0, 70: 1.0, 79: 1.0, 89: 1.0, 92: 1.0, 96: 1.0, 100: 1.0, 
> 103: 1.0, 108: 1.0, 110: 1.0, 115: 1.0, 123: 1.0, 131: 1.0, 132: 1.0}), 
> SALESCLOSEPRICE=225000)
>  ], schema=train_schema)
>  
> train_df.printSchema()
> train_df.show()
> {code}
> Error  message:
> {code:java}
> // Fail to execute line 17: ], schema=train_schema) Traceback (most recent 
> call last): File "/tmp/zeppelin_pyspark-3793375738105660281.py", line 375, in 
>  exec(code, _zcUserQueryNameSpace) File "", line 17, in 
>  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", 
> line 748, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, 
> data), schema) File 
>