[ https://issues.apache.org/jira/browse/SPARK-10762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Glenn Strycker closed SPARK-10762. ---------------------------------- This probably isn't completely fixed, but should be a new ticket for casting ArrayBuffers correctly > GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame > to RDD from Hive table > ---------------------------------------------------------------------------------------------------- > > Key: SPARK-10762 > URL: https://issues.apache.org/jira/browse/SPARK-10762 > Project: Spark > Issue Type: Bug > Components: Spark Core > Reporter: Glenn Strycker > > I have a Hive table in parquet format that was generated using > {code} > create table myTable (var1 int, var2 string, var3 int, var4 string, var5 > array<struct<a:int,b:string>>) stored as parquet; > {code} > I am able to verify that it was filled -- here is a sample value > {code} > [1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])] > {code} > I wish to put this into a Spark RDD of the form > {code} > ((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello")))) > {code} > Now, using spark-shell (I get the same problem in spark-submit), I made a > test RDD with these values > {code} > scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), > ArrayBuffer[(Int,String)]((1,"hello")))))) > tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), > scala.collection.mutable.ArrayBuffer[(Int, String)]))] = > ParallelCollectionRDD[44] at parallelize at <console>:85 > {code} > using an iterator, I can cast the ArrayBuffer as a HashSet in the following > new RDD: > {code} > scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = > new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ > HashSet(a)); tempHashSet } ))) > tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), > scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] > at map at <console>:87 > scala> tempRDD2.collect.foreach(println) > ((1,abcdef),((2,ghijkl),Set((1,hello)))) > {code} > But when I attempt to do the EXACT SAME THING with a DataFrame with a > HiveContext / SQLContext, I get the following error: > {code} > scala> val hc = new HiveContext(sc) > scala> import hc._ > scala> import hc.implicits._ > scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from > myTable""") > scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, > a(1).toString), ((a(2).toString.toInt, a(3).toString), > a(4).asInstanceOf[ArrayBuffer[(Int,String)]] ))) > scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var > tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = > tempHashSet ++ HashSet(a)); tempHashSet } ))) > tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), > scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] > at map at <console>:91 > scala> tempRDD3.collect.foreach(println) > org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in > stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 > (TID 5211, localhost): java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to scala.Tuple2 > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > Note that I get this same error "GenericRowWithSchema cannot be cast to > scala.Tuple2" when I run this in a compiled program using spark-submit. The > program crashes at RUN TIME when it encounters the conversion step, and I had > no compiler errors. > It seems very strange to me that my artificially generated RDD "tempRDD" > would work with the conversion, whereas the Hive query DataFrame->RDD did > not. I checked, and both of the RDDs have the same form: > {code} > scala> tempRDD > org.apache.spark.rdd.RDD[((Int, String), ((Int, String), > scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] > at map at DataFrame.scala:776 > scala> tempRDDfromHive > org.apache.spark.rdd.RDD[((Int, String), ((Int, String), > scala.collection.mutable.ArrayBuffer[(Int, String)]))] = > ParallelCollectionRDD[25] at parallelize at <console>:70 > {code} > the only difference is where their last step originated. I even tried > persisting, checkpointing, and materializing these RDDs before running the > steps for tempRDD2 and tempRDD3. All got the same error message. > I also read though related stackoverflow questions and Apache Spark Jira > issues, and from those I attempted casting the ArrayBuffer as an Iterator > instead, but that also failed on the second step with the same error. > Since the error seems to be only for the Hive table version, I'm tempted to > think that this is an issue with Spark/Hive integration in SparkQL. > Possibly related Apache Spark Jira Issues: > https://issues.apache.org/jira/browse/SPARK-1040 > https://issues.apache.org/jira/browse/SPARK-2737 > https://issues.apache.org/jira/browse/SPARK-4489 (still open) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org