Greg Bowyer created SPARK-10418:
-----------------------------------

             Summary: pyspark issue with nested array types
                 Key: SPARK-10418
                 URL: https://issues.apache.org/jira/browse/SPARK-10418
             Project: Spark
          Issue Type: Bug
            Reporter: Greg Bowyer


Hi all

Creating a nested dataframe with arraytypes seems to make a broken dataframe.

{code}

In [14]: df.printSchema()                                                       
                                                                                
                                       [150/200119]
root
 |-- export_date: long (nullable = true)
 |-- song_id: integer (nullable = true)
 |-- ranked_genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- song_id: long (nullable = true)
 |    |    |-- ranked_genre_paths: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- genre_id: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- parent: long (nullable = true)
 |    |    |    |    |    |-- level: long (nullable = true)
 |    |    |    |    |    |-- rank: long (nullable = true)
 |-- collections: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- collection_id: integer (nullable = true)
 |    |    |-- song_id: integer (nullable = true)
{code}

{code}
/home/gbowyer/apple-epf/spark/spark/python/pyspark/context.pyc in runJob(self, 
rdd, partitionFunc, partitions, allowLocal)
    895         mappedRDD = rdd.mapPartitions(partitionFunc)
    896         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), 
mappedRDD._jrdd, partitions,
--> 897                                           allowLocal)
    898         return list(_load_from_socket(port, 
mappedRDD._jrdd_deserializer))
    899 

/home/gbowyer/apple-epf/spark/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/home/gbowyer/apple-epf/spark/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 97.0 failed 4 times, most recent failure: Lost task 0.3 in stage 97.0 
(TID 3868, compute-10-2-44-13.us-east-1.urx.internal): 
java.lang.ClassCastException: java.util.ArrayList cannot be cast to 
org.apache.spark.sql.Row
        at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$4.apply(CatalystTypeConverters.scala:301)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$2.apply(CatalystTypeConverters.scala:282)
        at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
        at 
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$4.apply(CatalystTypeConverters.scala:301)
        at 
org.apache.spark.sql.DataFrame$$anonfun$35$$anonfun$apply$9.apply(DataFrame.scala:1369)
        at 
org.apache.spark.sql.DataFrame$$anonfun$35$$anonfun$apply$9.apply(DataFrame.scala:1369)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:111)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
        at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:425)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:248)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
        at 
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

Thoughts, ideas? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to