Hi Guys, 

I used applySchema to store a set of nested dictionaries and lists in a
parquet file. 

http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-td20228.html#a20461

It was successful and i could successfully load the data as well.Now im
trying to convert this SchemaRDD to a RDD of dictionaries so that I can run
some reduces on them. 

The schema of my RDD is as follows:
 |-- field1: string (nullable = true)
 |-- field2: integer (nullable = true)
 |-- field3: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = true)
 |-- field4: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)
 |-- field5: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- field6: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- field61: string (nullable = true)
 |    |    |-- field62: string (nullable = true)
 |    |    |-- field63: integer (nullable = true)

And Im using the following mapper to map these fields to a RDD that I can
reduce later. 

def generateRecords(line):
        # input : the row stored in parquet file
        # output : a python dictionary with all the key value pairs
        field1 = line.field1
        summary = {}
        summary['field2'] = line.field2
        summary['field3'] = line.field3
        summary['field4'] = line.field4
        summary['field5'] = line.field5
        summary['field6'] = line.field6         
        return (guid,summary)

profiles = sqc.parquetFile(path)
profileRecords = profiles.map(lambda line: generateRecords(line))

This code works perfectly well when field6 is not mapped. IE when you
comment out the line that maps field6 in generateRecords. the RDD gets
generated perfoectly. Even field 5 gets mapped. The key difference between
field 5 and 6 are, field5 is a list of strings and field 6 is a list of
tupes in the forma (String, String, Int) . But when you try to map field6,
it throws :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/root/spark/python/pyspark/rdd.py", line 847, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/root/spark/python/pyspark/rdd.py", line 838, in sum
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File "/root/spark/python/pyspark/rdd.py", line 759, in reduce
    vals = self.mapPartitions(func).collect()
  File "/root/spark/python/pyspark/rdd.py", line 723, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o88.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 32
in stage 3.0 failed 4 times, most recent failure: Lost task 32.3 in stage
3.0 (TID 1829, ip-172-31-18-36.ec2.internal):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/root/spark/python/pyspark/worker.py", line 79, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/root/spark/python/pyspark/serializers.py", line 128, in dump_stream
    self._write_with_length(obj, stream)
  File "/root/spark/python/pyspark/serializers.py", line 138, in
_write_with_length
    serialized = self.dumps(obj)
  File "/root/spark/python/pyspark/serializers.py", line 356, in dumps
    return cPickle.dumps(obj, 2)
PicklingError: Can't pickle <class 'pyspark.sql.List'>: attribute lookup
pyspark.sql.List failed

Can someone help me to understand what is going wrong here. 

Many thanks
SahanB




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-mapping-a-schema-RDD-when-converting-lists-tp20577.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to