numpy arrays and spark sql

2014-12-01 Thread Joseph Winston
This works as expected in the 1.1 branch: 

from pyspark.sql import *

rdd = sc.parallelize([range(0, 10), range(10,20), range(20, 30)]

# define the schema
schemaString = value1 value2 value3 value4 value5 value6 value7 value8 value9 
value10
fields = [StructField(field_name, IntegerType(), True) for field_name in 
schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaRDD = sqlContext.applySchema(rdd, schema)

# Register the table
schemaRDD.registerTempTable(slice)

# SQL can be run over SchemaRDDs that have been registered as a table.
results = sqlContext.sql(SELECT value1 FROM slice)

# The results of SQL queries are RDDs and support all the normal RDD operations.
print results.collect()

However changing the rdd to use a numpy array fails:

import np as np
rdd = sc.parallelize(np.arange(20).reshape(2, 10))

# define the schema
schemaString = value1 value2 value3 value4 value5 value6 value7 value8 value9 
value10
fields = [StructField(field_name, np.ndarray, True) for field_name in 
schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaRDD = sqlContext.applySchema(rdd, schema)

The error is:
Traceback (most recent call last):
  File stdin, line 2, in module
  File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 1119, in 
applySchema
_verify_type(row, schema)
  File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 735, in 
_verify_type
% (dataType, type(obj)))
TypeError: StructType(List(StructField(value1,type 
'numpy.ndarray',true),StructField(value2,type 
'numpy.ndarray',true),StructField(value3,type 
'numpy.ndarray',true),StructField(value4,type 
'numpy.ndarray',true),StructField(value5,type 
'numpy.ndarray',true),StructField(value6,type 
'numpy.ndarray',true),StructField(value7,type 
'numpy.ndarray',true),StructField(value8,type 
'numpy.ndarray',true),StructField(value9,type 
'numpy.ndarray',true),StructField(value10,type 'numpy.ndarray',true))) can 
not accept abject in type type 'numpy.ndarray'

I’ve tried np.int_ and np.int32 and they fail too.  What type should I use to 
make a numpy arrays work?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: numpy arrays and spark sql

2014-12-01 Thread Davies Liu
applySchema() only accept RDD of Row/list/tuple, it does not work with
numpy.array.

After applySchema(), the Python RDD will be pickled and unpickled in
JVM, so you will not have any benefit by using numpy.array.

It will work if you convert ndarray into list:

schemaRDD = sqlContext.applySchema(rdd.map(list), schema)

On Mon, Dec 1, 2014 at 6:33 PM, Joseph Winston josephwins...@me.com wrote:
 This works as expected in the 1.1 branch:

 from pyspark.sql import *

 rdd = sc.parallelize([range(0, 10), range(10,20), range(20, 30)]

 # define the schema
 schemaString = value1 value2 value3 value4 value5 value6 value7 value8 
 value9 value10
 fields = [StructField(field_name, IntegerType(), True) for field_name in 
 schemaString.split()]
 schema = StructType(fields)

 # Apply the schema to the RDD.
 schemaRDD = sqlContext.applySchema(rdd, schema)

 # Register the table
 schemaRDD.registerTempTable(slice)

 # SQL can be run over SchemaRDDs that have been registered as a table.
 results = sqlContext.sql(SELECT value1 FROM slice)

 # The results of SQL queries are RDDs and support all the normal RDD 
 operations.
 print results.collect()

 However changing the rdd to use a numpy array fails:

 import np as np
 rdd = sc.parallelize(np.arange(20).reshape(2, 10))

 # define the schema
 schemaString = value1 value2 value3 value4 value5 value6 value7 value8 
 value9 value10
 fields = [StructField(field_name, np.ndarray, True) for field_name in 
 schemaString.split()]
 schema = StructType(fields)

 # Apply the schema to the RDD.
 schemaRDD = sqlContext.applySchema(rdd, schema)

 The error is:
 Traceback (most recent call last):
   File stdin, line 2, in module
   File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 1119, in 
 applySchema
 _verify_type(row, schema)
   File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 735, in 
 _verify_type
 % (dataType, type(obj)))
 TypeError: StructType(List(StructField(value1,type 
 'numpy.ndarray',true),StructField(value2,type 
 'numpy.ndarray',true),StructField(value3,type 
 'numpy.ndarray',true),StructField(value4,type 
 'numpy.ndarray',true),StructField(value5,type 
 'numpy.ndarray',true),StructField(value6,type 
 'numpy.ndarray',true),StructField(value7,type 
 'numpy.ndarray',true),StructField(value8,type 
 'numpy.ndarray',true),StructField(value9,type 
 'numpy.ndarray',true),StructField(value10,type 'numpy.ndarray',true))) can 
 not accept abject in type type 'numpy.ndarray'

 I’ve tried np.int_ and np.int32 and they fail too.  What type should I use to 
 make a numpy arrays work?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org