Done. I also updated the name on the ticket to include both issues. "Spark SQL arrays: "explode()" fails and cannot save array type to Parquet"
https://issues.apache.org/jira/browse/SPARK-6570 On Fri, Mar 27, 2015 at 8:14 AM, Cheng Lian <lian.cs....@gmail.com> wrote: > Forgot to mention that, would you mind to also provide the full stack > trace of the exception thrown in the saveAsParquetFile call? Thanks! > > Cheng > > On 3/27/15 7:35 PM, Jon Chase wrote: > > https://issues.apache.org/jira/browse/SPARK-6570 > > I also left in the call to saveAsParquetFile(), as it produced a similar > exception (though there was no use of explode there). > > On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian <lian.cs....@gmail.com> wrote: > >> This should be a bug in the Explode.eval(), which always assumes the >> underlying SQL array is represented by a Scala Seq. Would you mind to open >> a JIRA ticket for this? Thanks! >> >> Cheng >> >> On 3/27/15 7:00 PM, Jon Chase wrote: >> >> Spark 1.3.0 >> >> Two issues: >> >> a) I'm unable to get a "lateral view explode" query to work on an array >> type >> b) I'm unable to save an array type to a Parquet file >> >> I keep running into this: >> >> java.lang.ClassCastException: [I cannot be cast to >> scala.collection.Seq >> >> Here's a stack trace from the explode issue: >> >> root >> |-- col1: string (nullable = false) >> |-- col2s: array (nullable = true) >> | |-- element: integer (containsNull = true) >> >> ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage >> 1.0 (TID 15) >> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq >> at >> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) >> ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] >> at >> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) >> ~[spark-sql_2.10-1.3.0.jar:1.3.0] >> at >> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) >> ~[spark-sql_2.10-1.3.0.jar:1.3.0] >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> ~[scala-library-2.10.4.jar:na] >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> ~[scala-library-2.10.4.jar:na] >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> ~[scala-library-2.10.4.jar:na] >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> ~[scala-library-2.10.4.jar:na] >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> ~[scala-library-2.10.4.jar:na] >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> ~[scala-library-2.10.4.jar:na] >> at >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >> ~[scala-library-2.10.4.jar:na] >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >> ~[scala-library-2.10.4.jar:na] >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >> ~[scala-library-2.10.4.jar:na] >> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >> ~[scala-library-2.10.4.jar:na] >> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >> ~[scala-library-2.10.4.jar:na] >> at >> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >> ~[scala-library-2.10.4.jar:na] >> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >> ~[scala-library-2.10.4.jar:na] >> at >> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >> ~[scala-library-2.10.4.jar:na] >> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >> ~[scala-library-2.10.4.jar:na] >> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) >> ~[spark-core_2.10-1.3.0.jar:1.3.0] >> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) >> ~[spark-core_2.10-1.3.0.jar:1.3.0] >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) >> ~[spark-core_2.10-1.3.0.jar:1.3.0] >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) >> ~[spark-core_2.10-1.3.0.jar:1.3.0] >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> ~[spark-core_2.10-1.3.0.jar:1.3.0] >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> ~[spark-core_2.10-1.3.0.jar:1.3.0] >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> ~[spark-core_2.10-1.3.0.jar:1.3.0] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> [na:1.8.0_31] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> [na:1.8.0_31] >> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] >> WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID >> 15, localhost): java.lang.ClassCastException: [I cannot be cast to >> scala.collection.Seq >> at >> org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) >> at >> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) >> at >> org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> at >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >> at >> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >> at >> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) >> at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> Maybe I'm defining the schema incorrectly? >> >> >> >> This test demonstrates both issues: >> >> @Rule >> public TemporaryFolder tmp = new TemporaryFolder(); >> >> @Test >> public void testPercentileWithExplode() throws Exception { >> StructType schema = DataTypes.createStructType(Lists.newArrayList( >> DataTypes.createStructField("col1", DataTypes.StringType, >> false), >> DataTypes.createStructField("col2s", >> DataTypes.createArrayType(DataTypes.IntegerType, true), true) >> )); >> >> JavaRDD<Row> rowRDD = sc.parallelize(Lists.newArrayList( >> RowFactory.create("test", new int[]{1, 2, 3}) >> )); >> >> DataFrame df = sql.createDataFrame(rowRDD, schema); >> df.registerTempTable("df"); >> df.printSchema(); >> >> List<int[]> ints = sql.sql("select col2s from df").javaRDD() >> .map(row -> (int[]) row.get(0)).collect(); >> assertEquals(1, ints.size()); >> assertArrayEquals(new int[]{1, 2, 3}, ints.get(0)); >> >> >> // fails: lateral view explode does not work: >> java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq >> List<Integer> explodedInts = sql.sql("select col2 from df lateral >> view explode(col2s) splode as col2").javaRDD() >> .map(row -> >> row.getInt(0)).collect(); >> assertEquals(3, explodedInts.size()); >> assertEquals(Lists.newArrayList(1, 2, 3), explodedInts); >> >> >> // fails: java.lang.ClassCastException: [I cannot be cast to >> scala.collection.Seq >> df.saveAsParquetFile(tmp.getRoot().getAbsolutePath() + >> "/parquet"); >> >> >> DataFrame loadedDf = sql.load(tmp.getRoot().getAbsolutePath() + >> "/parquet"); >> loadedDf.registerTempTable("loadedDf"); >> List<int[]> moreInts = sql.sql("select col2s from >> loadedDf").javaRDD() >> .map(row -> (int[]) >> row.get(0)).collect(); >> assertEquals(1, moreInts.size()); >> assertArrayEquals(new int[]{1, 2, 3}, moreInts.get(0)); >> } >> >> >> > >