[ https://issues.apache.org/jira/browse/SPARK-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shixiong Zhu reassigned SPARK-13390: ------------------------------------ Assignee: Shixiong Zhu > Java Spark createDataFrame with List parameter bug > -------------------------------------------------- > > Key: SPARK-13390 > URL: https://issues.apache.org/jira/browse/SPARK-13390 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.6.0 > Environment: Java spark, Linux > Reporter: mike niemaz > Assignee: Shixiong Zhu > > I noticed the following bug while testing the dataframe SQL join capabilities. > Instructions to reproduce it: > - Read a text file from local file system using JavaSparkContext#texFile > method > - Create a list of related custom objects based on the previously created > JavaRDD, using the map function > - Create a dataframe using SQLContext createDataFrame(java.util.List, Class) > method > - Count the dataframe elements using dataframe#count method > It crashes with the following stacktrace error: > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#7L]) > +- TungstenExchange SinglePartition, None > +- TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#10L]) > +- LocalTableScan [[empty row],[empty row],[empty row],[empty > row],[empty row],[empty row],[empty row],[empty row],[empty row],[empty row]] > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166) > at > org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) > at > org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) > at > org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) > at > org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) > at > org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) > at > org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) > at > org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1554) > at > org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1553) > at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) > at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1553) > at injection.EMATests.joinTest1(EMATests.java:259) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: > execute, tree: > TungstenExchange SinglePartition, None > +- TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#10L]) > +- LocalTableScan [[empty row],[empty row],[empty row],[empty row],[empty > row],[empty row],[empty row],[empty row],[empty row],[empty row]] > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) > at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:247) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) > ... 46 more > Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: > execute, tree: > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#10L]) > +- LocalTableScan [[empty row],[empty row],[empty row],[empty row],[empty > row],[empty row],[empty row],[empty row],[empty row],[empty row]] > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > at > org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:164) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) > ... 54 more > Caused by: org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:86) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:80) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) > ... 63 more > Caused by: java.io.NotSerializableException: > scala.collection.Iterator$$anon$11 > Serialization stack: > - object not serializable (class: scala.collection.Iterator$$anon$11, > value: empty iterator) > - field (class: scala.collection.Iterator$$anonfun$toStream$1, name: > $outer, type: interface scala.collection.Iterator) > - object (class scala.collection.Iterator$$anonfun$toStream$1, > <function0>) > - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: > interface scala.Function0) > - object (class scala.collection.immutable.Stream$Cons, > Stream([TRI1,N,TNW,160000,0006093430000,E,2016-02-01-15.20.31.434000], > [TRI2,N,TNW,170000,0006093430000,E,2016-02-01-15.20.31.434000], > [TRI3,N,TNW,180000,0006093430000,E,2016-02-01-15.20.31.434000], > [TRI4,N,TNW,190000,0006093430000,E,2016-02-01-15.20.31.434000], > [CHI1,N,TNY,200000,0006093430000,E,2016-02-01-15.20.31.434000], > [CHI2,N,TNY,210000,0006093430000,E,2016-02-01-15.20.31.434000], > [CHI3,N,TNY,220000,0006093430000,E,2016-02-01-15.20.31.434000], > [CHI4,N,TNY,230000,0006093430000,E,2016-02-01-15.20.31.434000], > [CRU1,N,TNY,240000,0006093430000,E,2016-02-01-15.20.31.434000], > [CRU2,N,TNY,250000,0006093430000,E,2016-02-01-15.20.31.434000])) > - field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: > $outer, type: class scala.collection.immutable.Stream) > - object (class scala.collection.immutable.Stream$$anonfun$map$1, > <function0>) > - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: > interface scala.Function0) > - object (class scala.collection.immutable.Stream$Cons, Stream([empty > row], [empty row], [empty row], [empty row], [empty row], [empty row], [empty > row], [empty row], [empty row], [empty row])) > - field (class: org.apache.spark.sql.execution.LocalTableScan, name: > rows, type: interface scala.collection.Seq) > - object (class org.apache.spark.sql.execution.LocalTableScan, > LocalTableScan [[empty row],[empty row],[empty row],[empty row],[empty > row],[empty row],[empty row],[empty row],[empty row],[empty row]] > ) > - field (class: > org.apache.spark.sql.execution.aggregate.TungstenAggregate, name: child, > type: class org.apache.spark.sql.execution.SparkPlan) > - object (class > org.apache.spark.sql.execution.aggregate.TungstenAggregate, > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#10L]) > +- LocalTableScan [[empty row],[empty row],[empty row],[empty row],[empty > row],[empty row],[empty row],[empty row],[empty row],[empty row]] > ) > - field (class: > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1, > name: $outer, type: class > org.apache.spark.sql.execution.aggregate.TungstenAggregate) > - object (class > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1, > <function0>) > - field (class: > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2, > name: $outer, type: class > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1) > - object (class > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2, > <function1>) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) > ... 75 more > A workaround is to use create dataframe directly on JavaRDDs instead of lists -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org