Glenn Strycker created SPARK-10762:
--------------------------------------

             Summary: GenericRowWithSchema exception in casting ArrayBuffer to 
HashSet in DataFrame to RDD from Hive table
                 Key: SPARK-10762
                 URL: https://issues.apache.org/jira/browse/SPARK-10762
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
            Reporter: Glenn Strycker


I have a Hive table in parquet format that was generated using

{code}
create table myTable (var1 int, var2 string, var3 int, var4 string, var5 
array<struct<a:int,b:string>>) stored as parquet;
{code}

I am able to verify that it was filled -- here is a sample value

{code}
[1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]
{code}

I wish to put this into a Spark RDD of the form

{code}
((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))
{code}

Now, using spark-shell (I get the same problem in spark-submit), I made a test 
RDD with these values

{code}
scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), 
ArrayBuffer[(Int,String)]((1,"hello"))))))
tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), 
scala.collection.mutable.ArrayBuffer[(Int, String)]))] = 
ParallelCollectionRDD[44] at parallelize at <console>:85
{code}

using an iterator, I can cast the ArrayBuffer as a HashSet in the following new 
RDD:

{code}
scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new 
HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ 
HashSet(a)); tempHashSet } )))
tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), 
scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at 
map at <console>:87

scala> tempRDD2.collect.foreach(println)
((1,abcdef),((2,ghijkl),Set((1,hello))))
{code}

But when I attempt to do the EXACT SAME THING with a DataFrame with a 
HiveContext / SQLContext, I get the following error:

{code}
scala> val hc = new HiveContext(sc)
scala> import hc._
scala> import hc.implicits._

scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from 
myTable""")

scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, 
a(1).toString), ((a(2).toString.toInt, a(3).toString), 
a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))

scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var 
tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = 
tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), 
scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at 
map at <console>:91

scala> tempRDD3.collect.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 
(TID 5211, localhost): java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to scala.Tuple2
       at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91)
       at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
       at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
       at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
       at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
       at scala.collection.AbstractIterator.to(Iterator.scala:1157)
       at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
       at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
       at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
       at org.apache.spark.scheduler.Task.run(Task.scala:64)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:724)

Driver stacktrace:
       at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
       at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
       at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
       at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
       at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at scala.Option.foreach(Option.scala:236)
       at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
       at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

Note that I get this same error "GenericRowWithSchema cannot be cast to 
scala.Tuple2" when I run this in a compiled program using spark-submit. The 
program crashes at RUN TIME when it encounters the conversion step, and I had 
no compiler errors.

It seems very strange to me that my artificially generated RDD "tempRDD" would 
work with the conversion, whereas the Hive query DataFrame->RDD did not. I 
checked, and both of the RDDs have the same form:

{code}
scala> tempRDD
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), 
scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] 
at map at DataFrame.scala:776

scala> tempRDDfromHive
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), 
scala.collection.mutable.ArrayBuffer[(Int, String)]))] = 
ParallelCollectionRDD[25] at parallelize at <console>:70
{code}

the only difference is where their last step originated. I even tried 
persisting, checkpointing, and materializing these RDDs before running the 
steps for tempRDD2 and tempRDD3. All got the same error message.

I also read though related stackoverflow questions and Apache Spark Jira 
issues, and from those I attempted casting the ArrayBuffer as an Iterator 
instead, but that also failed on the second step with the same error.

Since the error seems to be only for the Hive table version, I'm tempted to 
think that this is an issue with Spark/Hive integration in SparkQL.

Possibly related Apache Spark Jira Issues:
https://issues.apache.org/jira/browse/SPARK-1040
https://issues.apache.org/jira/browse/SPARK-2737
https://issues.apache.org/jira/browse/SPARK-4489 (still open)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to